| 3200 | } |
| 3201 | |
| 3202 | func waitForUpdates( |
| 3203 | t *testing.T, |
| 3204 | //nolint:revive // t takes precedence |
| 3205 | ctx context.Context, |
| 3206 | stream tailnetproto.DRPCTailnet_WorkspaceUpdatesClient, |
| 3207 | currentState map[uuid.UUID]workspace, |
| 3208 | expectedState map[uuid.UUID]workspace, |
| 3209 | ) { |
| 3210 | t.Helper() |
| 3211 | errCh := make(chan error, 1) |
| 3212 | go func() { |
| 3213 | for { |
| 3214 | select { |
| 3215 | case <-ctx.Done(): |
| 3216 | errCh <- ctx.Err() |
| 3217 | return |
| 3218 | default: |
| 3219 | } |
| 3220 | update, err := stream.Recv() |
| 3221 | if err != nil { |
| 3222 | errCh <- err |
| 3223 | return |
| 3224 | } |
| 3225 | for _, ws := range update.UpsertedWorkspaces { |
| 3226 | id, err := uuid.FromBytes(ws.Id) |
| 3227 | if err != nil { |
| 3228 | errCh <- err |
| 3229 | return |
| 3230 | } |
| 3231 | currentState[id] = workspace{ |
| 3232 | Status: ws.Status, |
| 3233 | NumAgents: currentState[id].NumAgents, |
| 3234 | } |
| 3235 | } |
| 3236 | for _, ws := range update.DeletedWorkspaces { |
| 3237 | id, err := uuid.FromBytes(ws.Id) |
| 3238 | if err != nil { |
| 3239 | errCh <- err |
| 3240 | return |
| 3241 | } |
| 3242 | currentState[id] = workspace{ |
| 3243 | Status: tailnetproto.Workspace_DELETED, |
| 3244 | NumAgents: currentState[id].NumAgents, |
| 3245 | } |
| 3246 | } |
| 3247 | for _, a := range update.UpsertedAgents { |
| 3248 | id, err := uuid.FromBytes(a.WorkspaceId) |
| 3249 | if err != nil { |
| 3250 | errCh <- err |
| 3251 | return |
| 3252 | } |
| 3253 | currentState[id] = workspace{ |
| 3254 | Status: currentState[id].Status, |
| 3255 | NumAgents: currentState[id].NumAgents + 1, |
| 3256 | } |
| 3257 | } |
| 3258 | for _, a := range update.DeletedAgents { |
| 3259 | id, err := uuid.FromBytes(a.WorkspaceId) |