MCPcopy
hub / github.com/segmentio/kafka-go / testReaderSetOffsetAt

Function testReaderSetOffsetAt

reader_test.go:177–212  ·  view source on GitHub ↗
(t *testing.T, ctx context.Context, r *Reader)

Source from the content-addressed store, hash-verified

175}
176
177func testReaderSetOffsetAt(t *testing.T, ctx context.Context, r *Reader) {
178 // We make 2 batches of messages here with a brief 2 second pause
179 // to ensure messages 0...9 will be written a few seconds before messages 10...19
180 // We'll then fetch the timestamp for message offset 10 and use that timestamp to set
181 // our reader
182 const N = 10
183 prepareReader(t, ctx, r, makeTestSequence(N)...)
184 time.Sleep(time.Second * 2)
185 prepareReader(t, ctx, r, makeTestSequence(N)...)
186
187 var ts time.Time
188 for i := 0; i < N*2; i++ {
189 m, err := r.ReadMessage(ctx)
190 if err != nil {
191 t.Error("error reading message", err)
192 }
193 // grab the time for the 10th message
194 if i == 10 {
195 ts = m.Time
196 }
197 }
198
199 err := r.SetOffsetAt(ctx, ts)
200 if err != nil {
201 t.Fatal("error setting offset by timestamp", err)
202 }
203
204 m, err := r.ReadMessage(context.Background())
205 if err != nil {
206 t.Fatal("error reading message", err)
207 }
208
209 if m.Offset != 10 {
210 t.Errorf("expected offset of 10, received offset %d", m.Offset)
211 }
212}
213
214func testReaderLag(t *testing.T, ctx context.Context, r *Reader) {
215 const N = 5

Callers

nothing calls this directly

Calls 5

prepareReaderFunction · 0.85
makeTestSequenceFunction · 0.85
SetOffsetAtMethod · 0.80
ReadMessageMethod · 0.45
ErrorMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…