MCPcopy
hub / github.com/IBM/sarama / TestClientCoordinatorWithConsumerOffsetsTopic

Function TestClientCoordinatorWithConsumerOffsetsTopic

client_test.go:1177–1229  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1175}
1176
1177func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
1178 seedBroker := NewMockBroker(t, 1)
1179 coordinator := NewMockBroker(t, 2)
1180
1181 replicas := []int32{coordinator.BrokerID()}
1182 metadataResponse1 := new(MetadataResponse)
1183 metadataResponse1.AddBroker(coordinator.Addr(), coordinator.BrokerID())
1184 metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
1185 seedBroker.Returns(metadataResponse1)
1186
1187 client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
1188 if err != nil {
1189 t.Fatal(err)
1190 }
1191
1192 coordinatorResponse1 := new(ConsumerMetadataResponse)
1193 coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
1194 coordinator.Returns(coordinatorResponse1)
1195
1196 coordinatorResponse2 := new(ConsumerMetadataResponse)
1197 coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
1198 coordinatorResponse2.CoordinatorHost = "127.0.0.1"
1199 coordinatorResponse2.CoordinatorPort = coordinator.Port()
1200
1201 coordinator.Returns(coordinatorResponse2)
1202
1203 broker, err := client.Coordinator("my_group")
1204 if err != nil {
1205 t.Error(err)
1206 }
1207
1208 if coordinator.Addr() != broker.Addr() {
1209 t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
1210 }
1211
1212 if coordinator.BrokerID() != broker.ID() {
1213 t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
1214 }
1215
1216 // Grab the cached value
1217 broker2, err := client.Coordinator("my_group")
1218 if err != nil {
1219 t.Error(err)
1220 }
1221
1222 if broker2.Addr() != broker.Addr() {
1223 t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
1224 }
1225
1226 coordinator.Close()
1227 seedBroker.Close()
1228 safeClose(t, client)
1229}
1230
1231func TestClientCoordinatorChangeWithConsumerOffsetsTopic(t *testing.T) {
1232 seedBroker := NewMockBroker(t, 1)

Callers

nothing calls this directly

Calls 15

BrokerIDMethod · 0.95
AddrMethod · 0.95
ReturnsMethod · 0.95
PortMethod · 0.95
CoordinatorMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80
IDMethod · 0.80
NewClientFunction · 0.70
NewTestConfigFunction · 0.70

Tested by

no test coverage detected