newAIBridgeDaemon constructs the in-memory aibridge daemon and wires up a subscription that hot-reloads the provider pool from the database on every ai_providers change event. The returned unsubscribe function tears down the subscription; callers must invoke it alongside Server.Close on shutdown.
(coderAPI *coderd.API, providers []aibridge.Provider, cfg codersdk.AIBridgeConfig)
| 31 | // function tears down the subscription; callers must invoke it |
| 32 | // alongside Server.Close on shutdown. |
| 33 | func newAIBridgeDaemon(coderAPI *coderd.API, providers []aibridge.Provider, cfg codersdk.AIBridgeConfig) (*aibridged.Server, func(), error) { |
| 34 | ctx := context.Background() |
| 35 | coderAPI.Logger.Debug(ctx, "starting in-memory aibridge daemon") |
| 36 | |
| 37 | logger := coderAPI.Logger.Named("aibridged") |
| 38 | |
| 39 | reg := prometheus.WrapRegistererWithPrefix("coder_aibridged_", coderAPI.PrometheusRegistry) |
| 40 | metrics := aibridge.NewMetrics(reg) |
| 41 | providerMetrics := aibridged.NewMetrics(reg) |
| 42 | tracer := coderAPI.TracerProvider.Tracer(tracing.TracerName) |
| 43 | |
| 44 | // Create pool for reusable stateful [aibridge.RequestBridge] instances (one per user). |
| 45 | pool, err := aibridged.NewCachedBridgePool(aibridged.DefaultPoolOptions, providers, logger.Named("pool"), metrics, tracer) // TODO: configurable size. |
| 46 | if err != nil { |
| 47 | return nil, nil, xerrors.Errorf("create request pool: %w", err) |
| 48 | } |
| 49 | |
| 50 | // Subscribe to ai_providers change events so the pool tracks the |
| 51 | // database without a restart. The boot-time `providers` snapshot |
| 52 | // derives from env config and serves as a fallback if the database |
| 53 | // load fails inside the reloader. |
| 54 | reloader := &poolDBReloader{ |
| 55 | pool: pool, |
| 56 | db: coderAPI.Database, |
| 57 | cfg: cfg, |
| 58 | logger: logger.Named("provider-loader"), |
| 59 | metrics: providerMetrics, |
| 60 | } |
| 61 | unsubscribe, err := aibridged.SubscribeProviderReload(ctx, coderAPI.Pubsub, reloader, logger.Named("provider-reload")) |
| 62 | if err != nil { |
| 63 | // Pool is still usable with the boot-time snapshot; subscription |
| 64 | // failure is logged but not fatal so the daemon still serves. |
| 65 | logger.Warn(ctx, "subscribe to ai providers change channel", slog.Error(err)) |
| 66 | unsubscribe = func() {} |
| 67 | } |
| 68 | |
| 69 | // Create daemon. |
| 70 | srv, err := aibridged.New(ctx, pool, func(dialCtx context.Context) (aibridged.DRPCClient, error) { |
| 71 | return coderAPI.CreateInMemoryAIBridgeServer(dialCtx) |
| 72 | }, logger, tracer) |
| 73 | if err != nil { |
| 74 | unsubscribe() |
| 75 | return nil, nil, xerrors.Errorf("start in-memory aibridge daemon: %w", err) |
| 76 | } |
| 77 | return srv, unsubscribe, nil |
| 78 | } |
| 79 | |
| 80 | // poolDBReloader implements [aibridged.ProviderReloader] by loading |
| 81 | // the live provider set from the database and forwarding it to the |
no test coverage detected