(t *testing.T)
| 1175 | } |
| 1176 | |
| 1177 | func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) { |
| 1178 | seedBroker := NewMockBroker(t, 1) |
| 1179 | coordinator := NewMockBroker(t, 2) |
| 1180 | |
| 1181 | replicas := []int32{coordinator.BrokerID()} |
| 1182 | metadataResponse1 := new(MetadataResponse) |
| 1183 | metadataResponse1.AddBroker(coordinator.Addr(), coordinator.BrokerID()) |
| 1184 | metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError) |
| 1185 | seedBroker.Returns(metadataResponse1) |
| 1186 | |
| 1187 | client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig()) |
| 1188 | if err != nil { |
| 1189 | t.Fatal(err) |
| 1190 | } |
| 1191 | |
| 1192 | coordinatorResponse1 := new(ConsumerMetadataResponse) |
| 1193 | coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable |
| 1194 | coordinator.Returns(coordinatorResponse1) |
| 1195 | |
| 1196 | coordinatorResponse2 := new(ConsumerMetadataResponse) |
| 1197 | coordinatorResponse2.CoordinatorID = coordinator.BrokerID() |
| 1198 | coordinatorResponse2.CoordinatorHost = "127.0.0.1" |
| 1199 | coordinatorResponse2.CoordinatorPort = coordinator.Port() |
| 1200 | |
| 1201 | coordinator.Returns(coordinatorResponse2) |
| 1202 | |
| 1203 | broker, err := client.Coordinator("my_group") |
| 1204 | if err != nil { |
| 1205 | t.Error(err) |
| 1206 | } |
| 1207 | |
| 1208 | if coordinator.Addr() != broker.Addr() { |
| 1209 | t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr()) |
| 1210 | } |
| 1211 | |
| 1212 | if coordinator.BrokerID() != broker.ID() { |
| 1213 | t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID()) |
| 1214 | } |
| 1215 | |
| 1216 | // Grab the cached value |
| 1217 | broker2, err := client.Coordinator("my_group") |
| 1218 | if err != nil { |
| 1219 | t.Error(err) |
| 1220 | } |
| 1221 | |
| 1222 | if broker2.Addr() != broker.Addr() { |
| 1223 | t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr()) |
| 1224 | } |
| 1225 | |
| 1226 | coordinator.Close() |
| 1227 | seedBroker.Close() |
| 1228 | safeClose(t, client) |
| 1229 | } |
| 1230 | |
| 1231 | func TestClientCoordinatorChangeWithConsumerOffsetsTopic(t *testing.T) { |
| 1232 | seedBroker := NewMockBroker(t, 1) |
nothing calls this directly
no test coverage detected