( rw http.ResponseWriter, r *http.Request, connect httpapi.EventSender, )
| 1704 | } |
| 1705 | |
| 1706 | func (api *API) watchWorkspaceAgentMetadata( |
| 1707 | rw http.ResponseWriter, |
| 1708 | r *http.Request, |
| 1709 | connect httpapi.EventSender, |
| 1710 | ) { |
| 1711 | // Allow us to interrupt watch via cancel. |
| 1712 | ctx, cancel := context.WithCancel(r.Context()) |
| 1713 | defer cancel() |
| 1714 | r = r.WithContext(ctx) // Rewire context for SSE cancellation. |
| 1715 | |
| 1716 | waws := httpmw.WorkspaceAgentAndWorkspaceParam(r) |
| 1717 | agentIDEncoded := make([]byte, metadatabatcher.UUIDBase64Size) |
| 1718 | err := metadatabatcher.EncodeAgentID(waws.WorkspaceAgent.ID, agentIDEncoded) |
| 1719 | if err != nil { |
| 1720 | httpapi.InternalServerError(rw, err) |
| 1721 | return |
| 1722 | } |
| 1723 | log := api.Logger.Named("workspace_metadata_watcher").With( |
| 1724 | slog.F("workspace_agent_id", waws.WorkspaceAgent.ID), |
| 1725 | slog.F("workspace_id", waws.WorkspaceTable.ID), |
| 1726 | ) |
| 1727 | |
| 1728 | // Send metadata on updates, we must ensure subscription before sending |
| 1729 | // initial metadata to guarantee that events in-between are not missed. |
| 1730 | // The channel carries no data - it's just a signal to fetch all metadata. |
| 1731 | update := make(chan struct{}, 1) |
| 1732 | |
| 1733 | // Subscribe to the global batched metadata channel. |
| 1734 | // The batcher publishes only to this channel to achieve O(1) NOTIFY scaling. |
| 1735 | cancelBatchSub, err := api.Pubsub.Subscribe(metadatabatcher.MetadataBatchPubsubChannel, func(_ context.Context, byt []byte) { |
| 1736 | if ctx.Err() != nil { |
| 1737 | return |
| 1738 | } |
| 1739 | |
| 1740 | if len(byt)%metadatabatcher.UUIDBase64Size != 0 { |
| 1741 | log.Error(ctx, "invalid batched pubsub message, pubsub message length was not a multiple of encoded agent UUID length", slog.Error(err)) |
| 1742 | return |
| 1743 | } |
| 1744 | |
| 1745 | // Compare each encoded agentID to our encoded agent ID. |
| 1746 | for i := 0; i < len(byt); i += metadatabatcher.UUIDBase64Size { |
| 1747 | if !bytes.Equal(byt[i:i+metadatabatcher.UUIDBase64Size], agentIDEncoded) { |
| 1748 | continue |
| 1749 | } |
| 1750 | |
| 1751 | log.Debug(ctx, "received metadata update from batch channel", |
| 1752 | slog.F("agent_id", waws.WorkspaceAgent.ID), |
| 1753 | slog.F("batch_size", len(byt)/metadatabatcher.UUIDBase64Size), |
| 1754 | ) |
| 1755 | |
| 1756 | // Signal to re-fetch all metadata for this agent. |
| 1757 | // Batch notifications don't include which keys changed, so we |
| 1758 | // always fetch all keys for this agent. |
| 1759 | // Attempt to read from the channel first so that we do not block on the write. |
| 1760 | select { |
| 1761 | case <-update: |
| 1762 | default: |
| 1763 | } |
no test coverage detected