MCPcopy
hub / github.com/segmentio/kafka-go / testReaderConsumerGroupVerifyPeriodicOffsetCommitter

Function testReaderConsumerGroupVerifyPeriodicOffsetCommitter

reader_test.go:941–968  ·  view source on GitHub ↗
(t *testing.T, ctx context.Context, r *Reader)

Source from the content-addressed store, hash-verified

939}
940
941func testReaderConsumerGroupVerifyPeriodicOffsetCommitter(t *testing.T, ctx context.Context, r *Reader) {
942 prepareReader(t, context.Background(), r, makeTestSequence(3)...)
943
944 if _, err := r.FetchMessage(ctx); err != nil {
945 t.Errorf("bad err: %v", err) // skip the first message
946 }
947
948 m, err := r.FetchMessage(ctx)
949 if err != nil {
950 t.Errorf("bad err: %v", err)
951 }
952
953 started := time.Now()
954 if err := r.CommitMessages(ctx, m); err != nil {
955 t.Errorf("bad commit message: %v", err)
956 }
957 if elapsed := time.Since(started); elapsed > 10*time.Millisecond {
958 t.Errorf("background commits should happen nearly instantly")
959 }
960
961 // wait for committer to pick up the commits
962 time.Sleep(r.config.CommitInterval * 3)
963
964 offsets := getOffsets(t, r.config)
965 if expected := map[int]int64{0: m.Offset + 1}; !reflect.DeepEqual(expected, offsets) {
966 t.Errorf("expected %v; got %v", expected, offsets)
967 }
968}
969
970func testReaderConsumerGroupVerifyCommitsOnClose(t *testing.T, ctx context.Context, r *Reader) {
971 prepareReader(t, context.Background(), r, makeTestSequence(3)...)

Callers

nothing calls this directly

Calls 5

prepareReaderFunction · 0.85
makeTestSequenceFunction · 0.85
getOffsetsFunction · 0.85
FetchMessageMethod · 0.80
CommitMessagesMethod · 0.80

Tested by

no test coverage detected