()
| 327 | } |
| 328 | |
| 329 | func (c *BasicCoordination) respLoop() { |
| 330 | defer func() { |
| 331 | cErr := c.client.Close() |
| 332 | if cErr != nil { |
| 333 | c.logger.Debug(context.Background(), |
| 334 | "failed to close coordinate client after respLoop exit", slog.Error(cErr)) |
| 335 | } |
| 336 | c.coordinatee.SetAllPeersLost() |
| 337 | close(c.respLoopDone) |
| 338 | }() |
| 339 | for { |
| 340 | resp, err := c.client.Recv() |
| 341 | if err != nil { |
| 342 | c.logger.Debug(context.Background(), |
| 343 | "failed to read from protocol", slog.Error(err)) |
| 344 | c.SendErr(xerrors.Errorf("read: %w", err)) |
| 345 | return |
| 346 | } |
| 347 | |
| 348 | if resp.Error != "" { |
| 349 | // ReadyForHandshake error can occur during race conditions, where we send a ReadyForHandshake message, |
| 350 | // but the source has already disconnected from the tunnel by the time we do. So, just log at warning. |
| 351 | if strings.HasPrefix(resp.Error, ReadyForHandshakeError) { |
| 352 | c.logger.Warn(context.Background(), "coordination warning", slog.F("msg", resp.Error)) |
| 353 | } else { |
| 354 | c.logger.Error(context.Background(), |
| 355 | "coordination protocol error", slog.F("error", resp.Error)) |
| 356 | } |
| 357 | } |
| 358 | |
| 359 | err = c.coordinatee.UpdatePeers(resp.GetPeerUpdates()) |
| 360 | if err != nil { |
| 361 | c.logger.Debug(context.Background(), "failed to update peers", slog.Error(err)) |
| 362 | c.SendErr(xerrors.Errorf("update peers: %w", err)) |
| 363 | return |
| 364 | } |
| 365 | |
| 366 | // Only send ReadyForHandshake acks from peers without a target. |
| 367 | if c.sendAcks { |
| 368 | // Send an ack back for all received peers. This could |
| 369 | // potentially be smarter to only send an ACK once per client, |
| 370 | // but there's nothing currently stopping clients from reusing |
| 371 | // IDs. |
| 372 | rfh := []*proto.CoordinateRequest_ReadyForHandshake{} |
| 373 | for _, peer := range resp.GetPeerUpdates() { |
| 374 | if peer.Kind != proto.CoordinateResponse_PeerUpdate_NODE { |
| 375 | continue |
| 376 | } |
| 377 | |
| 378 | rfh = append(rfh, &proto.CoordinateRequest_ReadyForHandshake{Id: peer.Id}) |
| 379 | } |
| 380 | if len(rfh) > 0 { |
| 381 | err := c.SendRequest(&proto.CoordinateRequest{ |
| 382 | ReadyForHandshake: rfh, |
| 383 | }) |
| 384 | if err != nil { |
| 385 | c.logger.Debug(context.Background(), |
| 386 | "failed to send ready for handshake", slog.Error(err)) |
no test coverage detected