(t *testing.T, ctx context.Context, r *Reader)
| 917 | } |
| 918 | |
| 919 | func testReaderConsumerGroupVerifyOffsetCommitted(t *testing.T, ctx context.Context, r *Reader) { |
| 920 | prepareReader(t, context.Background(), r, makeTestSequence(3)...) |
| 921 | |
| 922 | if _, err := r.FetchMessage(ctx); err != nil { |
| 923 | t.Errorf("bad err: %v", err) // skip the first message |
| 924 | } |
| 925 | |
| 926 | m, err := r.FetchMessage(ctx) |
| 927 | if err != nil { |
| 928 | t.Errorf("bad err: %v", err) |
| 929 | } |
| 930 | |
| 931 | if err := r.CommitMessages(ctx, m); err != nil { |
| 932 | t.Errorf("bad commit message: %v", err) |
| 933 | } |
| 934 | |
| 935 | offsets := getOffsets(t, r.config) |
| 936 | if expected := map[int]int64{0: m.Offset + 1}; !reflect.DeepEqual(expected, offsets) { |
| 937 | t.Errorf("expected %v; got %v", expected, offsets) |
| 938 | } |
| 939 | } |
| 940 | |
| 941 | func testReaderConsumerGroupVerifyPeriodicOffsetCommitter(t *testing.T, ctx context.Context, r *Reader) { |
| 942 | prepareReader(t, context.Background(), r, makeTestSequence(3)...) |
nothing calls this directly
no test coverage detected