Skip to content

Commit 40d1681

Browse files
committed
chore: wire up usage tracking for managed agents
1 parent dca0890 commit 40d1681

File tree

10 files changed

+210
-66
lines changed

10 files changed

+210
-66
lines changed

coderd/coderd.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/coder/coder/v2/coderd/oauth2provider"
2424
"github.com/coder/coder/v2/coderd/pproflabel"
2525
"github.com/coder/coder/v2/coderd/prebuilds"
26+
"github.com/coder/coder/v2/coderd/usage"
2627
"github.com/coder/coder/v2/coderd/wsbuilder"
2728

2829
"github.com/andybalholm/brotli"
@@ -200,6 +201,7 @@ type Options struct {
200201
TemplateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
201202
UserQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
202203
AccessControlStore *atomic.Pointer[dbauthz.AccessControlStore]
204+
UsageInserter *atomic.Pointer[usage.Inserter]
203205
// CoordinatorResumeTokenProvider is used to provide and validate resume
204206
// tokens issued by and passed to the coordinator DRPC API.
205207
CoordinatorResumeTokenProvider tailnet.ResumeTokenProvider
@@ -428,6 +430,13 @@ func New(options *Options) *API {
428430
v := schedule.NewAGPLUserQuietHoursScheduleStore()
429431
options.UserQuietHoursScheduleStore.Store(&v)
430432
}
433+
if options.UsageInserter == nil {
434+
options.UsageInserter = &atomic.Pointer[usage.Inserter]{}
435+
}
436+
if options.UsageInserter.Load() == nil {
437+
inserter := usage.NewAGPLInserter()
438+
options.UsageInserter.Store(&inserter)
439+
}
431440
if options.OneTimePasscodeValidityPeriod == 0 {
432441
options.OneTimePasscodeValidityPeriod = 20 * time.Minute
433442
}
@@ -590,6 +599,7 @@ func New(options *Options) *API {
590599
UserQuietHoursScheduleStore: options.UserQuietHoursScheduleStore,
591600
AccessControlStore: options.AccessControlStore,
592601
BuildUsageChecker: &buildUsageChecker,
602+
UsageInserter: options.UsageInserter,
593603
FileCache: files.New(options.PrometheusRegistry, options.Authorizer),
594604
Experiments: experiments,
595605
WebpushDispatcher: options.WebPushDispatcher,
@@ -1687,6 +1697,9 @@ type API struct {
16871697
// BuildUsageChecker is a pointer as it's passed around to multiple
16881698
// components.
16891699
BuildUsageChecker *atomic.Pointer[wsbuilder.UsageChecker]
1700+
// UsageInserter is a pointer to an atomic pointer because it is passed to
1701+
// multiple components.
1702+
UsageInserter *atomic.Pointer[usage.Inserter]
16901703

16911704
UpdatesProvider tailnet.WorkspaceUpdatesProvider
16921705

@@ -1902,6 +1915,7 @@ func (api *API) CreateInMemoryTaggedProvisionerDaemon(dialCtx context.Context, n
19021915
&api.Auditor,
19031916
api.TemplateScheduleStore,
19041917
api.UserQuietHoursScheduleStore,
1918+
api.UsageInserter,
19051919
api.DeploymentValues,
19061920
provisionerdserver.Options{
19071921
OIDCConfig: api.OIDCConfig,

coderd/provisionerdserver/provisionerdserver.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"cdr.dev/slog"
3131

32+
"github.com/coder/coder/v2/coderd/usage"
3233
"github.com/coder/coder/v2/coderd/util/slice"
3334

3435
"github.com/coder/coder/v2/codersdk/drpcsdk"
@@ -121,6 +122,7 @@ type server struct {
121122
DeploymentValues *codersdk.DeploymentValues
122123
NotificationsEnqueuer notifications.Enqueuer
123124
PrebuildsOrchestrator *atomic.Pointer[prebuilds.ReconciliationOrchestrator]
125+
UsageInserter *atomic.Pointer[usage.Inserter]
124126

125127
OIDCConfig promoauth.OAuth2Config
126128

@@ -174,6 +176,7 @@ func NewServer(
174176
auditor *atomic.Pointer[audit.Auditor],
175177
templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore],
176178
userQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore],
179+
usageInserter *atomic.Pointer[usage.Inserter],
177180
deploymentValues *codersdk.DeploymentValues,
178181
options Options,
179182
enqueuer notifications.Enqueuer,
@@ -195,6 +198,9 @@ func NewServer(
195198
if userQuietHoursScheduleStore == nil {
196199
return nil, xerrors.New("userQuietHoursScheduleStore is nil")
197200
}
201+
if usageInserter == nil {
202+
return nil, xerrors.New("usageCollector is nil")
203+
}
198204
if deploymentValues == nil {
199205
return nil, xerrors.New("deploymentValues is nil")
200206
}
@@ -244,6 +250,7 @@ func NewServer(
244250
heartbeatInterval: options.HeartbeatInterval,
245251
heartbeatFn: options.HeartbeatFn,
246252
PrebuildsOrchestrator: prebuildsOrchestrator,
253+
UsageInserter: usageInserter,
247254
}
248255

249256
if s.heartbeatFn == nil {
@@ -2026,6 +2033,20 @@ func (s *server) completeWorkspaceBuildJob(ctx context.Context, job database.Pro
20262033
sidebarAppID = uuid.NullUUID{}
20272034
}
20282035

2036+
if hasAITask {
2037+
// Insert usage event for managed agents.
2038+
usageInserter := s.UsageInserter.Load()
2039+
if usageInserter != nil {
2040+
event := usage.DCManagedAgentsV1{
2041+
Count: 1,
2042+
}
2043+
err = (*usageInserter).InsertDiscreteUsageEvent(ctx, db, event)
2044+
if err != nil {
2045+
return xerrors.Errorf("insert %q event: %w", event.EventType(), err)
2046+
}
2047+
}
2048+
}
2049+
20292050
// Regardless of whether there is an AI task or not, update the field to indicate one way or the other since it
20302051
// always defaults to nil. ONLY if has_ai_task=true MUST ai_task_sidebar_app_id be set.
20312052
if err := db.UpdateWorkspaceBuildAITaskByID(ctx, database.UpdateWorkspaceBuildAITaskByIDParams{

coderd/provisionerdserver/provisionerdserver_test.go

Lines changed: 68 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/coder/coder/v2/coderd/schedule"
4545
"github.com/coder/coder/v2/coderd/schedule/cron"
4646
"github.com/coder/coder/v2/coderd/telemetry"
47+
"github.com/coder/coder/v2/coderd/usage"
4748
"github.com/coder/coder/v2/coderd/wspubsub"
4849
"github.com/coder/coder/v2/codersdk"
4950
"github.com/coder/coder/v2/codersdk/agentsdk"
@@ -67,6 +68,13 @@ func testUserQuietHoursScheduleStore() *atomic.Pointer[schedule.UserQuietHoursSc
6768
return ptr
6869
}
6970

71+
func testUsageInserter() *atomic.Pointer[usage.Inserter] {
72+
ptr := &atomic.Pointer[usage.Inserter]{}
73+
inserter := usage.NewAGPLInserter()
74+
ptr.Store(&inserter)
75+
return ptr
76+
}
77+
7078
func TestAcquireJob_LongPoll(t *testing.T) {
7179
t.Parallel()
7280
//nolint:dogsled
@@ -2672,7 +2680,10 @@ func TestCompleteJob(t *testing.T) {
26722680
t.Run(tc.name, func(t *testing.T) {
26732681
t.Parallel()
26742682

2675-
srv, db, _, pd := setup(t, false, &overrides{})
2683+
fakeUsageInserter, usageInserterPtr := newFakeUsageInserter()
2684+
srv, db, _, pd := setup(t, false, &overrides{
2685+
usageInserter: usageInserterPtr,
2686+
})
26762687

26772688
importJobID := uuid.New()
26782689
tvID := uuid.New()
@@ -2741,6 +2752,10 @@ func TestCompleteJob(t *testing.T) {
27412752
require.NoError(t, err)
27422753
require.True(t, version.HasAITask.Valid) // We ALWAYS expect a value to be set, therefore not nil, i.e. valid = true.
27432754
require.Equal(t, tc.expected, version.HasAITask.Bool)
2755+
2756+
// We never expect a usage event to be collected for
2757+
// template imports.
2758+
require.Empty(t, fakeUsageInserter.collectedEvents)
27442759
})
27452760
}
27462761
})
@@ -2750,9 +2765,9 @@ func TestCompleteJob(t *testing.T) {
27502765
// will be set as well in that case.
27512766
t.Run("WorkspaceBuild", func(t *testing.T) {
27522767
type testcase struct {
2753-
name string
2754-
input *proto.CompletedJob_WorkspaceBuild
2755-
expected bool
2768+
name string
2769+
input *proto.CompletedJob_WorkspaceBuild
2770+
expectedHasAiTask bool
27562771
}
27572772

27582773
sidebarAppID := uuid.NewString()
@@ -2762,7 +2777,7 @@ func TestCompleteJob(t *testing.T) {
27622777
input: &proto.CompletedJob_WorkspaceBuild{
27632778
// No AiTasks defined.
27642779
},
2765-
expected: false,
2780+
expectedHasAiTask: false,
27662781
},
27672782
{
27682783
name: "has_ai_task is set to true",
@@ -2792,7 +2807,7 @@ func TestCompleteJob(t *testing.T) {
27922807
},
27932808
},
27942809
},
2795-
expected: true,
2810+
expectedHasAiTask: true,
27962811
},
27972812
// Checks regression for https://github.com/coder/coder/issues/18776
27982813
{
@@ -2808,13 +2823,16 @@ func TestCompleteJob(t *testing.T) {
28082823
},
28092824
},
28102825
},
2811-
expected: false,
2826+
expectedHasAiTask: false,
28122827
},
28132828
} {
28142829
t.Run(tc.name, func(t *testing.T) {
28152830
t.Parallel()
28162831

2817-
srv, db, _, pd := setup(t, false, &overrides{})
2832+
fakeUsageInserter, usageInserterPtr := newFakeUsageInserter()
2833+
srv, db, _, pd := setup(t, false, &overrides{
2834+
usageInserter: usageInserterPtr,
2835+
})
28182836

28192837
importJobID := uuid.New()
28202838
tvID := uuid.New()
@@ -2899,10 +2917,19 @@ func TestCompleteJob(t *testing.T) {
28992917
build, err = db.GetWorkspaceBuildByID(ctx, build.ID)
29002918
require.NoError(t, err)
29012919
require.True(t, build.HasAITask.Valid) // We ALWAYS expect a value to be set, therefore not nil, i.e. valid = true.
2902-
require.Equal(t, tc.expected, build.HasAITask.Bool)
2920+
require.Equal(t, tc.expectedHasAiTask, build.HasAITask.Bool)
29032921

2904-
if tc.expected {
2922+
if tc.expectedHasAiTask {
29052923
require.Equal(t, sidebarAppID, build.AITaskSidebarAppID.UUID.String())
2924+
2925+
// Check that a usage event was collected.
2926+
require.Len(t, fakeUsageInserter.collectedEvents, 1)
2927+
require.Equal(t, usage.DCManagedAgentsV1{
2928+
Count: 1,
2929+
}, fakeUsageInserter.collectedEvents[0])
2930+
} else {
2931+
// Check that no usage event was collected.
2932+
require.Empty(t, fakeUsageInserter.collectedEvents)
29062933
}
29072934
})
29082935
}
@@ -3835,6 +3862,7 @@ type overrides struct {
38353862
externalAuthConfigs []*externalauth.Config
38363863
templateScheduleStore *atomic.Pointer[schedule.TemplateScheduleStore]
38373864
userQuietHoursScheduleStore *atomic.Pointer[schedule.UserQuietHoursScheduleStore]
3865+
usageInserter *atomic.Pointer[usage.Inserter]
38383866
clock *quartz.Mock
38393867
acquireJobLongPollDuration time.Duration
38403868
heartbeatFn func(ctx context.Context) error
@@ -3855,6 +3883,7 @@ func setup(t *testing.T, ignoreLogErrors bool, ov *overrides) (proto.DRPCProvisi
38553883
var externalAuthConfigs []*externalauth.Config
38563884
tss := testTemplateScheduleStore()
38573885
uqhss := testUserQuietHoursScheduleStore()
3886+
usageInserter := testUsageInserter()
38583887
clock := quartz.NewReal()
38593888
pollDur := time.Duration(0)
38603889
if ov == nil {
@@ -3892,6 +3921,15 @@ func setup(t *testing.T, ignoreLogErrors bool, ov *overrides) (proto.DRPCProvisi
38923921
require.True(t, swapped)
38933922
}
38943923
}
3924+
if ov.usageInserter != nil {
3925+
tUsageInserter := usageInserter.Load()
3926+
// keep the initial test value if the override hasn't set the atomic pointer.
3927+
usageInserter = ov.usageInserter
3928+
if usageInserter.Load() == nil {
3929+
swapped := usageInserter.CompareAndSwap(nil, tUsageInserter)
3930+
require.True(t, swapped)
3931+
}
3932+
}
38953933
if ov.clock != nil {
38963934
clock = ov.clock
38973935
}
@@ -3947,6 +3985,7 @@ func setup(t *testing.T, ignoreLogErrors bool, ov *overrides) (proto.DRPCProvisi
39473985
auditPtr,
39483986
tss,
39493987
uqhss,
3988+
usageInserter,
39503989
deploymentValues,
39513990
provisionerdserver.Options{
39523991
ExternalAuthConfigs: externalAuthConfigs,
@@ -4061,3 +4100,22 @@ func (s *fakeStream) cancel() {
40614100
s.canceled = true
40624101
s.c.Broadcast()
40634102
}
4103+
4104+
type fakeUsageInserter struct {
4105+
collectedEvents []usage.Event
4106+
}
4107+
4108+
var _ usage.Inserter = &fakeUsageInserter{}
4109+
4110+
func newFakeUsageInserter() (*fakeUsageInserter, *atomic.Pointer[usage.Inserter]) {
4111+
ptr := &atomic.Pointer[usage.Inserter]{}
4112+
fake := &fakeUsageInserter{}
4113+
var inserter usage.Inserter = fake
4114+
ptr.Store(&inserter)
4115+
return fake, ptr
4116+
}
4117+
4118+
func (f *fakeUsageInserter) InsertDiscreteUsageEvent(_ context.Context, _ database.Store, event usage.DiscreteEvent) error {
4119+
f.collectedEvents = append(f.collectedEvents, event)
4120+
return nil
4121+
}

enterprise/cli/server.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/coder/coder/v2/enterprise/audit/backends"
2121
"github.com/coder/coder/v2/enterprise/coderd"
2222
"github.com/coder/coder/v2/enterprise/coderd/dormancy"
23+
"github.com/coder/coder/v2/enterprise/coderd/usage"
2324
"github.com/coder/coder/v2/enterprise/dbcrypt"
2425
"github.com/coder/coder/v2/enterprise/trialer"
2526
"github.com/coder/coder/v2/tailnet"
@@ -116,15 +117,57 @@ func (r *RootCmd) Server(_ func()) *serpent.Command {
116117
o.ExternalTokenEncryption = cs
117118
}
118119

120+
if o.LicenseKeys == nil {
121+
o.LicenseKeys = coderd.Keys
122+
}
123+
124+
multiCloser := &multiCloser{}
125+
126+
// Create the enterprise API.
119127
api, err := coderd.New(ctx, o)
120128
if err != nil {
121129
return nil, nil, err
122130
}
123-
return api.AGPL, api, nil
131+
multiCloser.Add(api)
132+
133+
// Start the enterprise usage publisher routine. This won't do anything
134+
// unless the deployment is licensed and one of the licenses has usage
135+
// publishing enabled.
136+
publisher := usage.NewTallymanPublisher(ctx, options.Logger, options.Database, o.LicenseKeys,
137+
usage.PublisherWithHTTPClient(api.HTTPClient),
138+
)
139+
err = publisher.Start()
140+
if err != nil {
141+
_ = multiCloser.Close()
142+
return nil, nil, xerrors.Errorf("start usage publisher: %w", err)
143+
}
144+
multiCloser.Add(publisher)
145+
146+
return api.AGPL, multiCloser, nil
124147
})
125148

126149
cmd.AddSubcommands(
127150
r.dbcryptCmd(),
128151
)
129152
return cmd
130153
}
154+
155+
type multiCloser struct {
156+
closers []io.Closer
157+
}
158+
159+
var _ io.Closer = &multiCloser{}
160+
161+
func (m *multiCloser) Add(closer io.Closer) {
162+
m.closers = append(m.closers, closer)
163+
}
164+
165+
func (m *multiCloser) Close() error {
166+
var mErr error
167+
for _, closer := range m.closers {
168+
if err := closer.Close(); err != nil {
169+
mErr = xerrors.Errorf("close %T: %w", closer, err)
170+
}
171+
}
172+
return mErr
173+
}

0 commit comments

Comments
 (0)