mappingQuery queries the database for all the mappings that the given peers should know about, that is, all the peers that it shares a tunnel with and their current node mappings (if they exist). It then sends the mapping snapshot to the corresponding mapper, where it will get transmitted to the pe
(peers []mKey)
| 1102 | // exist). It then sends the mapping snapshot to the corresponding mapper, where it will get |
| 1103 | // transmitted to the peer. |
| 1104 | func (q *querier) mappingQuery(peers []mKey) error { |
| 1105 | // Filter to peers with active mappers before hitting the DB. |
| 1106 | q.mu.Lock() |
| 1107 | active := make([]uuid.UUID, 0, len(peers)) |
| 1108 | activeKeys := make([]mKey, 0, len(peers)) |
| 1109 | for _, p := range peers { |
| 1110 | if _, ok := q.mappers[p]; ok { |
| 1111 | active = append(active, uuid.UUID(p)) |
| 1112 | activeKeys = append(activeKeys, p) |
| 1113 | } |
| 1114 | } |
| 1115 | q.mu.Unlock() |
| 1116 | if len(active) == 0 { |
| 1117 | q.logger.Debug(q.ctx, "batch mapping query: no active mappers") |
| 1118 | return nil |
| 1119 | } |
| 1120 | |
| 1121 | q.logger.Debug(q.ctx, "batch querying mappings", |
| 1122 | slog.F("num_peers", len(active))) |
| 1123 | bindings, err := q.store.GetTailnetTunnelPeerBindingsBatch(q.ctx, active) |
| 1124 | if err != nil && !xerrors.Is(err, sql.ErrNoRows) { |
| 1125 | return xerrors.Errorf("get tunnel peer bindings batch: %w", err) |
| 1126 | } |
| 1127 | q.logger.Debug(q.ctx, "batch queried mappings", |
| 1128 | slog.F("num_bindings", len(bindings))) |
| 1129 | |
| 1130 | // Group bindings by lookup_id (the peer that needs the mapping). |
| 1131 | grouped := make(map[uuid.UUID][]database.GetTailnetTunnelPeerBindingsBatchRow) |
| 1132 | for _, b := range bindings { |
| 1133 | grouped[b.LookupID] = append(grouped[b.LookupID], b) |
| 1134 | } |
| 1135 | |
| 1136 | // Dispatch each peer's mappings to its mapper. |
| 1137 | for _, mk := range activeKeys { |
| 1138 | peerID := uuid.UUID(mk) |
| 1139 | rows := grouped[peerID] |
| 1140 | mappings, err := q.bindingsToMappings(rows) |
| 1141 | if err != nil { |
| 1142 | q.logger.Error(q.ctx, "failed to convert batch mappings", |
| 1143 | slog.F("peer_id", peerID), slog.Error(err)) |
| 1144 | continue |
| 1145 | } |
| 1146 | q.mu.Lock() |
| 1147 | mpr, ok := q.mappers[mk] |
| 1148 | q.mu.Unlock() |
| 1149 | if !ok { |
| 1150 | continue |
| 1151 | } |
| 1152 | if err := agpl.SendCtx(mpr.ctx, mpr.mappings, mappings); err != nil { |
| 1153 | q.logger.Debug(q.ctx, "failed to send mappings to peer", |
| 1154 | slog.F("peer_id", peerID), slog.Error(err)) |
| 1155 | continue |
| 1156 | } |
| 1157 | } |
| 1158 | return nil |
| 1159 | } |
| 1160 | |
| 1161 | // bindingsToMappings converts binding rows to mappings. |
no test coverage detected