MCPcopy Index your code
hub / github.com/coder/coder / newAIBridgeDaemon

Function newAIBridgeDaemon

cli/aibridged.go:33–78  ·  view source on GitHub ↗

newAIBridgeDaemon constructs the in-memory aibridge daemon and wires up a subscription that hot-reloads the provider pool from the database on every ai_providers change event. The returned unsubscribe function tears down the subscription; callers must invoke it alongside Server.Close on shutdown.

(coderAPI *coderd.API, providers []aibridge.Provider, cfg codersdk.AIBridgeConfig)

Source from the content-addressed store, hash-verified

31// function tears down the subscription; callers must invoke it
32// alongside Server.Close on shutdown.
33func newAIBridgeDaemon(coderAPI *coderd.API, providers []aibridge.Provider, cfg codersdk.AIBridgeConfig) (*aibridged.Server, func(), error) {
34 ctx := context.Background()
35 coderAPI.Logger.Debug(ctx, "starting in-memory aibridge daemon")
36
37 logger := coderAPI.Logger.Named("aibridged")
38
39 reg := prometheus.WrapRegistererWithPrefix("coder_aibridged_", coderAPI.PrometheusRegistry)
40 metrics := aibridge.NewMetrics(reg)
41 providerMetrics := aibridged.NewMetrics(reg)
42 tracer := coderAPI.TracerProvider.Tracer(tracing.TracerName)
43
44 // Create pool for reusable stateful [aibridge.RequestBridge] instances (one per user).
45 pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, logger.Named("pool"), metrics, tracer) // TODO: configurable size.
46 if err != nil {
47 return nil, nil, xerrors.Errorf("create request pool: %w", err)
48 }
49
50 // Subscribe to ai_providers change events so the pool tracks the
51 // database without a restart. The boot-time `providers` snapshot
52 // derives from env config and serves as a fallback if the database
53 // load fails inside the reloader.
54 reloader := &poolDBReloader{
55 pool: pool,
56 db: coderAPI.Database,
57 cfg: cfg,
58 logger: logger.Named("provider-loader"),
59 metrics: providerMetrics,
60 }
61 unsubscribe, err := aibridged.SubscribeProviderReload(ctx, coderAPI.Pubsub, reloader, logger.Named("provider-reload"))
62 if err != nil {
63 // Pool is still usable with the boot-time snapshot; subscription
64 // failure is logged but not fatal so the daemon still serves.
65 logger.Warn(ctx, "subscribe to ai providers change channel", slog.Error(err))
66 unsubscribe = func() {}
67 }
68
69 // Create daemon.
70 srv, err := aibridged.New(ctx, pool, func(dialCtx context.Context) (aibridged.DRPCClient, error) {
71 return coderAPI.CreateInMemoryAIBridgeServer(dialCtx)
72 }, logger, tracer)
73 if err != nil {
74 unsubscribe()
75 return nil, nil, xerrors.Errorf("start in-memory aibridge daemon: %w", err)
76 }
77 return srv, unsubscribe, nil
78}
79
80// poolDBReloader implements [aibridged.ProviderReloader] by loading
81// the live provider set from the database and forwarding it to the

Callers 1

ServerMethod · 0.85

Calls 11

NewMetricsFunction · 0.92
NewMetricsFunction · 0.92
NewCachedBridgePoolFunction · 0.92
SubscribeProviderReloadFunction · 0.92
NewFunction · 0.92
unsubscribeFunction · 0.85
NamedMethod · 0.80
TracerMethod · 0.80
ErrorfMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected