CreateInMemoryAIBridgeServer creates a [aibridged.DRPCServer] and returns a [aibridged.DRPCClient] to it, connected over an in-memory transport. This server is responsible for all the Coder-specific functionality that aibridged requires such as persistence and retrieving configuration.
(dialCtx context.Context)
| 54 | // This server is responsible for all the Coder-specific functionality that aibridged |
| 55 | // requires such as persistence and retrieving configuration. |
| 56 | func (api *API) CreateInMemoryAIBridgeServer(dialCtx context.Context) (client aibridged.DRPCClient, err error) { |
| 57 | // TODO(dannyk): implement options. |
| 58 | // TODO(dannyk): implement tracing. |
| 59 | // TODO(dannyk): implement API versioning. |
| 60 | |
| 61 | clientSession, serverSession := drpcsdk.MemTransportPipe() |
| 62 | defer func() { |
| 63 | if err != nil { |
| 64 | _ = clientSession.Close() |
| 65 | _ = serverSession.Close() |
| 66 | } |
| 67 | }() |
| 68 | |
| 69 | mux := drpcmux.New() |
| 70 | srv, err := aibridgedserver.NewServer(api.ctx, api.Database, api.Logger.Named("aibridgedserver"), |
| 71 | api.AccessURL.String(), api.DeploymentValues.AI.BridgeConfig, api.ExternalAuthConfigs, api.Experiments, api.AISeatTracker) |
| 72 | if err != nil { |
| 73 | return nil, err |
| 74 | } |
| 75 | err = aibridgedproto.DRPCRegisterRecorder(mux, srv) |
| 76 | if err != nil { |
| 77 | return nil, xerrors.Errorf("register recorder service: %w", err) |
| 78 | } |
| 79 | err = aibridgedproto.DRPCRegisterMCPConfigurator(mux, srv) |
| 80 | if err != nil { |
| 81 | return nil, xerrors.Errorf("register MCP configurator service: %w", err) |
| 82 | } |
| 83 | err = aibridgedproto.DRPCRegisterAuthorizer(mux, srv) |
| 84 | if err != nil { |
| 85 | return nil, xerrors.Errorf("register key validator service: %w", err) |
| 86 | } |
| 87 | server := drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux}, |
| 88 | drpcserver.Options{ |
| 89 | Manager: drpcsdk.DefaultDRPCOptions(nil), |
| 90 | Log: func(err error) { |
| 91 | if errors.Is(err, io.EOF) { |
| 92 | return |
| 93 | } |
| 94 | api.Logger.Debug(dialCtx, "aibridged drpc server error", slog.Error(err)) |
| 95 | }, |
| 96 | }, |
| 97 | ) |
| 98 | // in-mem pipes aren't technically "websockets" but they have the same properties as far as the |
| 99 | // API is concerned: they are long-lived connections that we need to close before completing |
| 100 | // shutdown of the API. |
| 101 | api.WebsocketWaitMutex.Lock() |
| 102 | api.WebsocketWaitGroup.Add(1) |
| 103 | api.WebsocketWaitMutex.Unlock() |
| 104 | go func() { |
| 105 | defer api.WebsocketWaitGroup.Done() |
| 106 | // Here we pass the background context, since we want the server to keep serving until the |
| 107 | // client hangs up. The aibridged is local, in-mem, so there isn't a danger of losing contact with it and |
| 108 | // having a dead connection we don't know the status of. |
| 109 | err := server.Serve(context.Background(), serverSession) |
| 110 | api.Logger.Info(dialCtx, "aibridge daemon disconnected", slog.Error(err)) |
| 111 | // Close the sessions, so we don't leak goroutines serving them. |
| 112 | _ = clientSession.Close() |
| 113 | _ = serverSession.Close() |