An attempt to consume the same partition twice should fail.
(t *testing.T)
| 373 | |
| 374 | // An attempt to consume the same partition twice should fail. |
| 375 | func TestConsumerDuplicate(t *testing.T) { |
| 376 | // Given |
| 377 | broker0 := NewMockBroker(t, 0) |
| 378 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 379 | "MetadataRequest": NewMockMetadataResponse(t). |
| 380 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 381 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 382 | "OffsetRequest": NewMockOffsetResponse(t). |
| 383 | SetOffset("my_topic", 0, OffsetOldest, 0). |
| 384 | SetOffset("my_topic", 0, OffsetNewest, 1000), |
| 385 | "FetchRequest": NewMockFetchResponse(t, 1), |
| 386 | }) |
| 387 | |
| 388 | config := NewTestConfig() |
| 389 | config.ChannelBufferSize = 0 |
| 390 | c, err := NewConsumer([]string{broker0.Addr()}, config) |
| 391 | if err != nil { |
| 392 | t.Fatal(err) |
| 393 | } |
| 394 | |
| 395 | pc1, err := c.ConsumePartition("my_topic", 0, 0) |
| 396 | if err != nil { |
| 397 | t.Fatal(err) |
| 398 | } |
| 399 | |
| 400 | // When |
| 401 | pc2, err := c.ConsumePartition("my_topic", 0, 0) |
| 402 | |
| 403 | // Then |
| 404 | var target ConfigurationError |
| 405 | ok := errors.As(err, &target) |
| 406 | if pc2 != nil || !ok || string(target) != "That topic/partition is already being consumed" { |
| 407 | t.Fatal("A partition cannot be consumed twice at the same time") |
| 408 | } |
| 409 | |
| 410 | safeClose(t, pc1) |
| 411 | safeClose(t, c) |
| 412 | broker0.Close() |
| 413 | } |
| 414 | |
| 415 | func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) { |
| 416 | // Given |
nothing calls this directly
no test coverage detected