(ctx context.Context, cutoff time.Duration)
| 480 | } |
| 481 | |
| 482 | func (m *MultiAgentController) doExpireOldAgents(ctx context.Context, cutoff time.Duration) { |
| 483 | // TODO: add some attrs to this. |
| 484 | ctx, span := m.tracer.Start(ctx, tracing.FuncName()) |
| 485 | defer span.End() |
| 486 | |
| 487 | start := time.Now() |
| 488 | deletedCount := 0 |
| 489 | |
| 490 | m.mu.Lock() |
| 491 | defer m.mu.Unlock() |
| 492 | m.logger.Debug(ctx, "pruning inactive agents", slog.F("agent_count", len(m.connectionTimes))) |
| 493 | for agentID, lastConnection := range m.connectionTimes { |
| 494 | // If no one has connected since the cutoff and there are no active |
| 495 | // connections, remove the agent. |
| 496 | if time.Since(lastConnection) > cutoff && len(m.tickets[agentID]) == 0 { |
| 497 | if m.coordination != nil { |
| 498 | err := m.coordination.SendRequest(&proto.CoordinateRequest{ |
| 499 | RemoveTunnel: &proto.CoordinateRequest_Tunnel{Id: agentID[:]}, |
| 500 | }) |
| 501 | if err != nil { |
| 502 | m.logger.Debug(ctx, "unsubscribe expired agent", slog.Error(err), slog.F("agent_id", agentID)) |
| 503 | m.coordination.SendErr(xerrors.Errorf("unsubscribe expired agent: %w", err)) |
| 504 | // close the client because we do not want to do a graceful disconnect by |
| 505 | // closing the coordination. |
| 506 | _ = m.coordination.CloseClient() |
| 507 | m.coordination = nil |
| 508 | // Here we continue deleting any inactive agents: there is no point in |
| 509 | // re-establishing tunnels to expired agents when we eventually reconnect. |
| 510 | } |
| 511 | } |
| 512 | deletedCount++ |
| 513 | delete(m.connectionTimes, agentID) |
| 514 | } |
| 515 | } |
| 516 | m.logger.Debug(ctx, "pruned inactive agents", |
| 517 | slog.F("deleted", deletedCount), |
| 518 | slog.F("took", time.Since(start)), |
| 519 | ) |
| 520 | } |
| 521 | |
| 522 | func (m *MultiAgentController) Close() { |
| 523 | m.cancel() |
no test coverage detected