findMembersByTopic groups the memberGroupMetadata by topic.
(members []GroupMember)
| 299 | |
| 300 | // findMembersByTopic groups the memberGroupMetadata by topic. |
| 301 | func findMembersByTopic(members []GroupMember) map[string][]GroupMember { |
| 302 | membersByTopic := map[string][]GroupMember{} |
| 303 | for _, member := range members { |
| 304 | for _, topic := range member.Topics { |
| 305 | membersByTopic[topic] = append(membersByTopic[topic], member) |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | // normalize ordering of members to enabling grouping across topics by partitions |
| 310 | // |
| 311 | // Want: |
| 312 | // C0 [T0/P0, T1/P0] |
| 313 | // C1 [T0/P1, T1/P1] |
| 314 | // |
| 315 | // Not: |
| 316 | // C0 [T0/P0, T1/P1] |
| 317 | // C1 [T0/P1, T1/P0] |
| 318 | // |
| 319 | // Even though the later is still round robin, the partitions are crossed |
| 320 | // |
| 321 | for _, members := range membersByTopic { |
| 322 | sort.Slice(members, func(i, j int) bool { |
| 323 | return members[i].ID < members[j].ID |
| 324 | }) |
| 325 | } |
| 326 | |
| 327 | return membersByTopic |
| 328 | } |
| 329 | |
| 330 | // findGroupBalancer returns the GroupBalancer with the specified protocolName |
| 331 | // from the slice provided. |
no outgoing calls