(t *testing.T, ctx context.Context, r *Reader)
| 175 | } |
| 176 | |
| 177 | func testReaderSetOffsetAt(t *testing.T, ctx context.Context, r *Reader) { |
| 178 | // We make 2 batches of messages here with a brief 2 second pause |
| 179 | // to ensure messages 0...9 will be written a few seconds before messages 10...19 |
| 180 | // We'll then fetch the timestamp for message offset 10 and use that timestamp to set |
| 181 | // our reader |
| 182 | const N = 10 |
| 183 | prepareReader(t, ctx, r, makeTestSequence(N)...) |
| 184 | time.Sleep(time.Second * 2) |
| 185 | prepareReader(t, ctx, r, makeTestSequence(N)...) |
| 186 | |
| 187 | var ts time.Time |
| 188 | for i := 0; i < N*2; i++ { |
| 189 | m, err := r.ReadMessage(ctx) |
| 190 | if err != nil { |
| 191 | t.Error("error reading message", err) |
| 192 | } |
| 193 | // grab the time for the 10th message |
| 194 | if i == 10 { |
| 195 | ts = m.Time |
| 196 | } |
| 197 | } |
| 198 | |
| 199 | err := r.SetOffsetAt(ctx, ts) |
| 200 | if err != nil { |
| 201 | t.Fatal("error setting offset by timestamp", err) |
| 202 | } |
| 203 | |
| 204 | m, err := r.ReadMessage(context.Background()) |
| 205 | if err != nil { |
| 206 | t.Fatal("error reading message", err) |
| 207 | } |
| 208 | |
| 209 | if m.Offset != 10 { |
| 210 | t.Errorf("expected offset of 10, received offset %d", m.Offset) |
| 211 | } |
| 212 | } |
| 213 | |
| 214 | func testReaderLag(t *testing.T, ctx context.Context, r *Reader) { |
| 215 | const N = 5 |
nothing calls this directly
no test coverage detected
searching dependent graphs…