(t *testing.T, ctx context.Context, r *Reader)
| 939 | } |
| 940 | |
| 941 | func testReaderConsumerGroupVerifyPeriodicOffsetCommitter(t *testing.T, ctx context.Context, r *Reader) { |
| 942 | prepareReader(t, context.Background(), r, makeTestSequence(3)...) |
| 943 | |
| 944 | if _, err := r.FetchMessage(ctx); err != nil { |
| 945 | t.Errorf("bad err: %v", err) // skip the first message |
| 946 | } |
| 947 | |
| 948 | m, err := r.FetchMessage(ctx) |
| 949 | if err != nil { |
| 950 | t.Errorf("bad err: %v", err) |
| 951 | } |
| 952 | |
| 953 | started := time.Now() |
| 954 | if err := r.CommitMessages(ctx, m); err != nil { |
| 955 | t.Errorf("bad commit message: %v", err) |
| 956 | } |
| 957 | if elapsed := time.Since(started); elapsed > 10*time.Millisecond { |
| 958 | t.Errorf("background commits should happen nearly instantly") |
| 959 | } |
| 960 | |
| 961 | // wait for committer to pick up the commits |
| 962 | time.Sleep(r.config.CommitInterval * 3) |
| 963 | |
| 964 | offsets := getOffsets(t, r.config) |
| 965 | if expected := map[int]int64{0: m.Offset + 1}; !reflect.DeepEqual(expected, offsets) { |
| 966 | t.Errorf("expected %v; got %v", expected, offsets) |
| 967 | } |
| 968 | } |
| 969 | |
| 970 | func testReaderConsumerGroupVerifyCommitsOnClose(t *testing.T, ctx context.Context, r *Reader) { |
| 971 | prepareReader(t, context.Background(), r, makeTestSequence(3)...) |
nothing calls this directly
no test coverage detected