MCPcopy
hub / github.com/IBM/sarama / TestConsumerRecreate

Function TestConsumerRecreate

consumer_test.go:334–372  ·  view source on GitHub ↗

It is possible to close a partition consumer and create the same anew.

(t *testing.T)

Source from the content-addressed store, hash-verified

332
333// It is possible to close a partition consumer and create the same anew.
334func TestConsumerRecreate(t *testing.T) {
335 // Given
336 broker0 := NewMockBroker(t, 0)
337 broker0.SetHandlerByMap(map[string]MockResponse{
338 "MetadataRequest": NewMockMetadataResponse(t).
339 SetBroker(broker0.Addr(), broker0.BrokerID()).
340 SetLeader("my_topic", 0, broker0.BrokerID()),
341 "OffsetRequest": NewMockOffsetResponse(t).
342 SetOffset("my_topic", 0, OffsetOldest, 0).
343 SetOffset("my_topic", 0, OffsetNewest, 1000),
344 "FetchRequest": NewMockFetchResponse(t, 1).
345 SetMessage("my_topic", 0, 10, testMsg),
346 })
347
348 c, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig())
349 if err != nil {
350 t.Fatal(err)
351 }
352
353 pc, err := c.ConsumePartition("my_topic", 0, 10)
354 if err != nil {
355 t.Fatal(err)
356 }
357 assertMessageOffset(t, <-pc.Messages(), 10)
358
359 // When
360 safeClose(t, pc)
361 pc, err = c.ConsumePartition("my_topic", 0, 10)
362 if err != nil {
363 t.Fatal(err)
364 }
365
366 // Then
367 assertMessageOffset(t, <-pc.Messages(), 10)
368
369 safeClose(t, pc)
370 safeClose(t, c)
371 broker0.Close()
372}
373
374// An attempt to consume the same partition twice should fail.
375func TestConsumerDuplicate(t *testing.T) {

Callers

nothing calls this directly

Calls 15

SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
ConsumePartitionMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
NewMockFetchResponseFunction · 0.85
assertMessageOffsetFunction · 0.85
SetLeaderMethod · 0.80
SetBrokerMethod · 0.80

Tested by

no test coverage detected