MCPcopy
hub / github.com/segmentio/kafka-go / testReaderConsumerGroupVerifyCommitsOnClose

Function testReaderConsumerGroupVerifyCommitsOnClose

reader_test.go:970–997  ·  reader_test.go::testReaderConsumerGroupVerifyCommitsOnClose
(t *testing.T, ctx context.Context, r *Reader)

Source from the content-addressed store, hash-verified

968}
969
970func testReaderConsumerGroupVerifyCommitsOnClose(t *testing.T, ctx context.Context, r *Reader) {
971 prepareReader(t, context.Background(), r, makeTestSequence(3)...)
972
973 if _, err := r.FetchMessage(ctx); err != nil {
974 t.Errorf("bad err: %v", err) // skip the first message
975 }
976
977 m, err := r.FetchMessage(ctx)
978 if err != nil {
979 t.Errorf("bad err: %v", err)
980 }
981
982 if err := r.CommitMessages(ctx, m); err != nil {
983 t.Errorf("bad commit message: %v", err)
984 }
985
986 if err := r.Close(); err != nil {
987 t.Errorf("bad Close: %v", err)
988 }
989
990 r2 := NewReader(r.config)
991 defer r2.Close()
992
993 offsets := getOffsets(t, r2.config)
994 if expected := map[int]int64{0: m.Offset + 1}; !reflect.DeepEqual(expected, offsets) {
995 t.Errorf("expected %v; got %v", expected, offsets)
996 }
997}
998
999func testReaderConsumerGroupReadContentAcrossPartitions(t *testing.T, ctx context.Context, r *Reader) {
1000 const N = 12

Callers

nothing calls this directly

Calls 8

CloseMethod · 0.95
prepareReaderFunction · 0.85
makeTestSequenceFunction · 0.85
NewReaderFunction · 0.85
getOffsetsFunction · 0.85
FetchMessageMethod · 0.80
CommitMessagesMethod · 0.80
CloseMethod · 0.45

Tested by

no test coverage detected