MCPcopy Index your code
hub / github.com/coder/coder / watchWorkspaceAgentMetadata

Method watchWorkspaceAgentMetadata

coderd/workspaceagents.go:1706–1915  ·  view source on GitHub ↗
(
	rw http.ResponseWriter,
	r *http.Request,
	connect httpapi.EventSender,
)

Source from the content-addressed store, hash-verified

1704}
1705
1706func (api *API) watchWorkspaceAgentMetadata(
1707 rw http.ResponseWriter,
1708 r *http.Request,
1709 connect httpapi.EventSender,
1710) {
1711 // Allow us to interrupt watch via cancel.
1712 ctx, cancel := context.WithCancel(r.Context())
1713 defer cancel()
1714 r = r.WithContext(ctx) // Rewire context for SSE cancellation.
1715
1716 waws := httpmw.WorkspaceAgentAndWorkspaceParam(r)
1717 agentIDEncoded := make([]byte, metadatabatcher.UUIDBase64Size)
1718 err := metadatabatcher.EncodeAgentID(waws.WorkspaceAgent.ID, agentIDEncoded)
1719 if err != nil {
1720 httpapi.InternalServerError(rw, err)
1721 return
1722 }
1723 log := api.Logger.Named("workspace_metadata_watcher").With(
1724 slog.F("workspace_agent_id", waws.WorkspaceAgent.ID),
1725 slog.F("workspace_id", waws.WorkspaceTable.ID),
1726 )
1727
1728 // Send metadata on updates, we must ensure subscription before sending
1729 // initial metadata to guarantee that events in-between are not missed.
1730 // The channel carries no data - it's just a signal to fetch all metadata.
1731 update := make(chan struct{}, 1)
1732
1733 // Subscribe to the global batched metadata channel.
1734 // The batcher publishes only to this channel to achieve O(1) NOTIFY scaling.
1735 cancelBatchSub, err := api.Pubsub.Subscribe(metadatabatcher.MetadataBatchPubsubChannel, func(_ context.Context, byt []byte) {
1736 if ctx.Err() != nil {
1737 return
1738 }
1739
1740 if len(byt)%metadatabatcher.UUIDBase64Size != 0 {
1741 log.Error(ctx, "invalid batched pubsub message, pubsub message length was not a multiple of encoded agent UUID length", slog.Error(err))
1742 return
1743 }
1744
1745 // Compare each encoded agentID to our encoded agent ID.
1746 for i := 0; i < len(byt); i += metadatabatcher.UUIDBase64Size {
1747 if !bytes.Equal(byt[i:i+metadatabatcher.UUIDBase64Size], agentIDEncoded) {
1748 continue
1749 }
1750
1751 log.Debug(ctx, "received metadata update from batch channel",
1752 slog.F("agent_id", waws.WorkspaceAgent.ID),
1753 slog.F("batch_size", len(byt)/metadatabatcher.UUIDBase64Size),
1754 )
1755
1756 // Signal to re-fetch all metadata for this agent.
1757 // Batch notifications don't include which keys changed, so we
1758 // always fetch all keys for this agent.
1759 // Attempt to read from the channel first so that we do not block on the write.
1760 select {
1761 case <-update:
1762 default:
1763 }

Calls 15

EncodeAgentIDFunction · 0.92
InternalServerErrorFunction · 0.92
WriteFunction · 0.92
RequestLoggerFromContextFunction · 0.92
IsQueryCanceledErrorFunction · 0.92
WithContextMethod · 0.80
NamedMethod · 0.80
ErrMethod · 0.80
ContextMethod · 0.65
SubscribeMethod · 0.65

Tested by

no test coverage detected