| 1180 | } |
| 1181 | |
| 1182 | func (cg *ConsumerGroup) makeAssignments(assignments map[string][]int32, offsets map[string]map[int]int64) map[string][]PartitionAssignment { |
| 1183 | topicAssignments := make(map[string][]PartitionAssignment) |
| 1184 | for _, topic := range cg.config.Topics { |
| 1185 | topicPartitions := assignments[topic] |
| 1186 | topicAssignments[topic] = make([]PartitionAssignment, 0, len(topicPartitions)) |
| 1187 | for _, partition := range topicPartitions { |
| 1188 | var offset int64 |
| 1189 | partitionOffsets, ok := offsets[topic] |
| 1190 | if ok { |
| 1191 | offset, ok = partitionOffsets[int(partition)] |
| 1192 | } |
| 1193 | if !ok { |
| 1194 | offset = cg.config.StartOffset |
| 1195 | } |
| 1196 | topicAssignments[topic] = append(topicAssignments[topic], PartitionAssignment{ |
| 1197 | ID: int(partition), |
| 1198 | Offset: offset, |
| 1199 | }) |
| 1200 | } |
| 1201 | } |
| 1202 | return topicAssignments |
| 1203 | } |
| 1204 | |
| 1205 | func (cg *ConsumerGroup) leaveGroup(memberID string) error { |
| 1206 | // don't attempt to leave the group if no memberID was ever assigned. |