Skip to content
95 changes: 95 additions & 0 deletions coderd/database/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,101 @@ func TestQueuePosition(t *testing.T) {
}
}

func TestAcquireProvisionerJob(t *testing.T) {
t.Parallel()

t.Run("HumanInitiatedJobsFirst", func(t *testing.T) {
t.Parallel()
var (
db, _ = dbtestutil.NewDB(t)
ctx = testutil.Context(t, testutil.WaitMedium)
org = dbgen.Organization(t, db, database.Organization{})
_ = dbgen.ProvisionerDaemon(t, db, database.ProvisionerDaemon{}) // Required for queue position
now = dbtime.Now()
numJobs = 10
humanIDs = make([]uuid.UUID, 0, numJobs/2)
prebuildIDs = make([]uuid.UUID, 0, numJobs/2)
)

// Given: a number of jobs in the queue, with prebuilds and non-prebuilds interleaved
for idx := range numJobs {
var initiator uuid.UUID
if idx%2 == 0 {
initiator = database.PrebuildsSystemUserID
} else {
initiator = uuid.MustParse("c0dec0de-c0de-c0de-c0de-c0dec0dec0de")
}
pj, err := db.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
ID: uuid.MustParse(fmt.Sprintf("00000000-0000-0000-0000-00000000000%x", idx+1)),
CreatedAt: time.Now().Add(-time.Second * time.Duration(idx)),
UpdatedAt: time.Now().Add(-time.Second * time.Duration(idx)),
InitiatorID: initiator,
OrganizationID: org.ID,
Provisioner: database.ProvisionerTypeEcho,
Type: database.ProvisionerJobTypeWorkspaceBuild,
StorageMethod: database.ProvisionerStorageMethodFile,
FileID: uuid.New(),
Input: json.RawMessage(`{}`),
Tags: database.StringMap{},
TraceMetadata: pqtype.NullRawMessage{},
})
require.NoError(t, err)
// We expected prebuilds to be acquired after human-initiated jobs.
if initiator == database.PrebuildsSystemUserID {
prebuildIDs = append([]uuid.UUID{pj.ID}, prebuildIDs...)
} else {
humanIDs = append([]uuid.UUID{pj.ID}, humanIDs...)
}
t.Logf("created job id=%q initiator=%q created_at=%q", pj.ID.String(), pj.InitiatorID.String(), pj.CreatedAt.String())
}

expectedIDs := append(humanIDs, prebuildIDs...) //nolint:gocritic // not the same slice

// When: we query the queue positions for the jobs
qjs, err := db.GetProvisionerJobsByIDsWithQueuePosition(ctx, database.GetProvisionerJobsByIDsWithQueuePositionParams{
IDs: expectedIDs,
StaleIntervalMS: provisionerdserver.StaleInterval.Milliseconds(),
})
require.NoError(t, err)
require.Len(t, qjs, numJobs)
// Ensure the jobs are sorted by queue position.
sort.Slice(qjs, func(i, j int) bool {
return qjs[i].QueuePosition < qjs[j].QueuePosition
})

// Then: the queue positions for the jobs should indicate the order in which
// they will be acquired, with human-initiated jobs first.
for idx, qj := range qjs {
t.Logf("queued job %d/%d id=%q initiator=%q created_at=%q queue_position=%d", idx+1, numJobs, qj.ProvisionerJob.ID.String(), qj.ProvisionerJob.InitiatorID.String(), qj.ProvisionerJob.CreatedAt.String(), qj.QueuePosition)
require.Equal(t, expectedIDs[idx].String(), qj.ProvisionerJob.ID.String(), "job %d/%d should match expected id", idx+1, numJobs)
require.Equal(t, int64(idx+1), qj.QueuePosition, "job %d/%d should have queue position %d", idx+1, numJobs, idx+1)
}

// When: the jobs are acquired
// Then: human-initiated jobs are prioritized first.
for idx := range numJobs {
acquired, err := db.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
OrganizationID: org.ID,
StartedAt: sql.NullTime{Time: time.Now(), Valid: true},
WorkerID: uuid.NullUUID{UUID: uuid.New(), Valid: true},
Types: []database.ProvisionerType{database.ProvisionerTypeEcho},
ProvisionerTags: json.RawMessage(`{}`),
})
require.NoError(t, err)
require.Equal(t, expectedIDs[idx].String(), acquired.ID.String(), "acquired job %d/%d with initiator %q", idx+1, numJobs, acquired.InitiatorID.String())
t.Logf("acquired job id=%q initiator=%q created_at=%q", acquired.ID.String(), acquired.InitiatorID.String(), acquired.CreatedAt.String())
err = db.UpdateProvisionerJobWithCompleteByID(ctx, database.UpdateProvisionerJobWithCompleteByIDParams{
ID: acquired.ID,
UpdatedAt: now,
CompletedAt: sql.NullTime{Time: now, Valid: true},
Error: sql.NullString{},
ErrorCode: sql.NullString{},
})
require.NoError(t, err, "mark job %d/%d as complete", idx+1, numJobs)
}
})
}

func TestUserLastSeenFilter(t *testing.T) {
t.Parallel()
if testing.Short() {
Expand Down
12 changes: 7 additions & 5 deletions coderd/database/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions coderd/database/queries/provisionerjobs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ WHERE
-- they are aliases and the code that calls this query already relies on a different type
AND provisioner_tagset_contains(@provisioner_tags :: jsonb, potential_job.tags :: jsonb)
ORDER BY
potential_job.created_at
-- Ensure that human-initiated jobs are prioritized over prebuilds.
potential_job.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC,
potential_job.created_at ASC
FOR UPDATE
SKIP LOCKED
LIMIT
Expand Down Expand Up @@ -74,7 +76,7 @@ WITH filtered_provisioner_jobs AS (
pending_jobs AS (
-- Step 2: Extract only pending jobs
SELECT
id, created_at, tags
id, initiator_id, created_at, tags
FROM
provisioner_jobs
WHERE
Expand All @@ -89,7 +91,7 @@ ranked_jobs AS (
SELECT
pj.id,
pj.created_at,
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.created_at ASC) AS queue_position,
ROW_NUMBER() OVER (PARTITION BY opd.id ORDER BY pj.initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, pj.created_at ASC) AS queue_position,
COUNT(*) OVER (PARTITION BY opd.id) AS queue_size
FROM
pending_jobs pj
Expand Down Expand Up @@ -128,7 +130,7 @@ ORDER BY
-- name: GetProvisionerJobsByOrganizationAndStatusWithQueuePositionAndProvisioner :many
WITH pending_jobs AS (
SELECT
id, created_at
id, initiator_id, created_at
FROM
provisioner_jobs
WHERE
Expand All @@ -143,7 +145,7 @@ WITH pending_jobs AS (
queue_position AS (
SELECT
id,
ROW_NUMBER() OVER (ORDER BY created_at ASC) AS queue_position
ROW_NUMBER() OVER (ORDER BY initiator_id = 'c42fdf75-3097-471c-8c33-fb52454d81c0'::uuid ASC, created_at ASC) AS queue_position
FROM
pending_jobs
),
Expand Down
Loading