MCPcopy Index your code
hub / github.com/coder/coder / respLoop

Method respLoop

tailnet/controllers.go:329–393  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

327}
328
329func (c *BasicCoordination) respLoop() {
330 defer func() {
331 cErr := c.client.Close()
332 if cErr != nil {
333 c.logger.Debug(context.Background(),
334 "failed to close coordinate client after respLoop exit", slog.Error(cErr))
335 }
336 c.coordinatee.SetAllPeersLost()
337 close(c.respLoopDone)
338 }()
339 for {
340 resp, err := c.client.Recv()
341 if err != nil {
342 c.logger.Debug(context.Background(),
343 "failed to read from protocol", slog.Error(err))
344 c.SendErr(xerrors.Errorf("read: %w", err))
345 return
346 }
347
348 if resp.Error != "" {
349 // ReadyForHandshake error can occur during race conditions, where we send a ReadyForHandshake message,
350 // but the source has already disconnected from the tunnel by the time we do. So, just log at warning.
351 if strings.HasPrefix(resp.Error, ReadyForHandshakeError) {
352 c.logger.Warn(context.Background(), "coordination warning", slog.F("msg", resp.Error))
353 } else {
354 c.logger.Error(context.Background(),
355 "coordination protocol error", slog.F("error", resp.Error))
356 }
357 }
358
359 err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates())
360 if err != nil {
361 c.logger.Debug(context.Background(), "failed to update peers", slog.Error(err))
362 c.SendErr(xerrors.Errorf("update peers: %w", err))
363 return
364 }
365
366 // Only send ReadyForHandshake acks from peers without a target.
367 if c.sendAcks {
368 // Send an ack back for all received peers. This could
369 // potentially be smarter to only send an ACK once per client,
370 // but there's nothing currently stopping clients from reusing
371 // IDs.
372 rfh := []*proto.CoordinateRequest_ReadyForHandshake{}
373 for _, peer := range resp.GetPeerUpdates() {
374 if peer.Kind != proto.CoordinateResponse_PeerUpdate_NODE {
375 continue
376 }
377
378 rfh = append(rfh, &proto.CoordinateRequest_ReadyForHandshake{Id: peer.Id})
379 }
380 if len(rfh) > 0 {
381 err := c.SendRequest(&proto.CoordinateRequest{
382 ReadyForHandshake: rfh,
383 })
384 if err != nil {
385 c.logger.Debug(context.Background(),
386 "failed to send ready for handshake", slog.Error(err))

Callers 1

NewCoordinationMethod · 0.95

Calls 9

SendErrMethod · 0.95
SendRequestMethod · 0.95
GetPeerUpdatesMethod · 0.80
CloseMethod · 0.65
SetAllPeersLostMethod · 0.65
RecvMethod · 0.65
UpdatePeersMethod · 0.65
ErrorMethod · 0.45
ErrorfMethod · 0.45

Tested by

no test coverage detected