(t *testing.T, ctx context.Context, r *Reader)
| 968 | } |
| 969 | |
| 970 | func testReaderConsumerGroupVerifyCommitsOnClose(t *testing.T, ctx context.Context, r *Reader) { |
| 971 | prepareReader(t, context.Background(), r, makeTestSequence(3)...) |
| 972 | |
| 973 | if _, err := r.FetchMessage(ctx); err != nil { |
| 974 | t.Errorf("bad err: %v", err) // skip the first message |
| 975 | } |
| 976 | |
| 977 | m, err := r.FetchMessage(ctx) |
| 978 | if err != nil { |
| 979 | t.Errorf("bad err: %v", err) |
| 980 | } |
| 981 | |
| 982 | if err := r.CommitMessages(ctx, m); err != nil { |
| 983 | t.Errorf("bad commit message: %v", err) |
| 984 | } |
| 985 | |
| 986 | if err := r.Close(); err != nil { |
| 987 | t.Errorf("bad Close: %v", err) |
| 988 | } |
| 989 | |
| 990 | r2 := NewReader(r.config) |
| 991 | defer r2.Close() |
| 992 | |
| 993 | offsets := getOffsets(t, r2.config) |
| 994 | if expected := map[int]int64{0: m.Offset + 1}; !reflect.DeepEqual(expected, offsets) { |
| 995 | t.Errorf("expected %v; got %v", expected, offsets) |
| 996 | } |
| 997 | } |
| 998 | |
| 999 | func testReaderConsumerGroupReadContentAcrossPartitions(t *testing.T, ctx context.Context, r *Reader) { |
| 1000 | const N = 12 |
nothing calls this directly
no test coverage detected