MCPcopy
hub / github.com/IBM/sarama / TestConsumerDuplicate

Function TestConsumerDuplicate

consumer_test.go:375–413  ·  view source on GitHub ↗

An attempt to consume the same partition twice should fail.

(t *testing.T)

Source from the content-addressed store, hash-verified

373
374// An attempt to consume the same partition twice should fail.
375func TestConsumerDuplicate(t *testing.T) {
376 // Given
377 broker0 := NewMockBroker(t, 0)
378 broker0.SetHandlerByMap(map[string]MockResponse{
379 "MetadataRequest": NewMockMetadataResponse(t).
380 SetBroker(broker0.Addr(), broker0.BrokerID()).
381 SetLeader("my_topic", 0, broker0.BrokerID()),
382 "OffsetRequest": NewMockOffsetResponse(t).
383 SetOffset("my_topic", 0, OffsetOldest, 0).
384 SetOffset("my_topic", 0, OffsetNewest, 1000),
385 "FetchRequest": NewMockFetchResponse(t, 1),
386 })
387
388 config := NewTestConfig()
389 config.ChannelBufferSize = 0
390 c, err := NewConsumer([]string{broker0.Addr()}, config)
391 if err != nil {
392 t.Fatal(err)
393 }
394
395 pc1, err := c.ConsumePartition("my_topic", 0, 0)
396 if err != nil {
397 t.Fatal(err)
398 }
399
400 // When
401 pc2, err := c.ConsumePartition("my_topic", 0, 0)
402
403 // Then
404 var target ConfigurationError
405 ok := errors.As(err, &target)
406 if pc2 != nil || !ok || string(target) != "That topic/partition is already being consumed" {
407 t.Fatal("A partition cannot be consumed twice at the same time")
408 }
409
410 safeClose(t, pc1)
411 safeClose(t, c)
412 broker0.Close()
413}
414
415func runConsumerLeaderRefreshErrorTestWithConfig(t *testing.T, config *Config) {
416 // Given

Callers

nothing calls this directly

Calls 15

SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
SetLeaderMethod · 0.80
SetBrokerMethod · 0.80
FatalMethod · 0.80

Tested by

no test coverage detected