MCPcopy Index your code
hub / github.com/coder/coder / CreateInMemoryAIBridgeServer

Method CreateInMemoryAIBridgeServer

coderd/aibridged.go:56–122  ·  view source on GitHub ↗

CreateInMemoryAIBridgeServer creates a [aibridged.DRPCServer] and returns a [aibridged.DRPCClient] to it, connected over an in-memory transport. This server is responsible for all the Coder-specific functionality that aibridged requires such as persistence and retrieving configuration.

(dialCtx context.Context)

Source from the content-addressed store, hash-verified

54// This server is responsible for all the Coder-specific functionality that aibridged
55// requires such as persistence and retrieving configuration.
56func (api *API) CreateInMemoryAIBridgeServer(dialCtx context.Context) (client aibridged.DRPCClient, err error) {
57 // TODO(dannyk): implement options.
58 // TODO(dannyk): implement tracing.
59 // TODO(dannyk): implement API versioning.
60
61 clientSession, serverSession := drpcsdk.MemTransportPipe()
62 defer func() {
63 if err != nil {
64 _ = clientSession.Close()
65 _ = serverSession.Close()
66 }
67 }()
68
69 mux := drpcmux.New()
70 srv, err := aibridgedserver.NewServer(api.ctx, api.Database, api.Logger.Named("aibridgedserver"),
71 api.AccessURL.String(), api.DeploymentValues.AI.BridgeConfig, api.ExternalAuthConfigs, api.Experiments, api.AISeatTracker)
72 if err != nil {
73 return nil, err
74 }
75 err = aibridgedproto.DRPCRegisterRecorder(mux, srv)
76 if err != nil {
77 return nil, xerrors.Errorf("register recorder service: %w", err)
78 }
79 err = aibridgedproto.DRPCRegisterMCPConfigurator(mux, srv)
80 if err != nil {
81 return nil, xerrors.Errorf("register MCP configurator service: %w", err)
82 }
83 err = aibridgedproto.DRPCRegisterAuthorizer(mux, srv)
84 if err != nil {
85 return nil, xerrors.Errorf("register key validator service: %w", err)
86 }
87 server := drpcserver.NewWithOptions(&tracing.DRPCHandler{Handler: mux},
88 drpcserver.Options{
89 Manager: drpcsdk.DefaultDRPCOptions(nil),
90 Log: func(err error) {
91 if errors.Is(err, io.EOF) {
92 return
93 }
94 api.Logger.Debug(dialCtx, "aibridged drpc server error", slog.Error(err))
95 },
96 },
97 )
98 // in-mem pipes aren't technically "websockets" but they have the same properties as far as the
99 // API is concerned: they are long-lived connections that we need to close before completing
100 // shutdown of the API.
101 api.WebsocketWaitMutex.Lock()
102 api.WebsocketWaitGroup.Add(1)
103 api.WebsocketWaitMutex.Unlock()
104 go func() {
105 defer api.WebsocketWaitGroup.Done()
106 // Here we pass the background context, since we want the server to keep serving until the
107 // client hangs up. The aibridged is local, in-mem, so there isn't a danger of losing contact with it and
108 // having a dead connection we don't know the status of.
109 err := server.Serve(context.Background(), serverSession)
110 api.Logger.Info(dialCtx, "aibridge daemon disconnected", slog.Error(err))
111 // Close the sessions, so we don't leak goroutines serving them.
112 _ = clientSession.Close()
113 _ = serverSession.Close()

Callers 5

newAIBridgeDaemonFunction · 0.80
TestIntegrationFunction · 0.80
startTestAIBridgeDaemonFunction · 0.80

Calls 15

MemTransportPipeFunction · 0.92
NewServerFunction · 0.92
DefaultDRPCOptionsFunction · 0.92
NamedMethod · 0.80
CloseMethod · 0.65
NewMethod · 0.65
AddMethod · 0.65
StringMethod · 0.45
ErrorfMethod · 0.45
IsMethod · 0.45
ErrorMethod · 0.45
LockMethod · 0.45

Tested by 4

TestIntegrationFunction · 0.64
startTestAIBridgeDaemonFunction · 0.64