MCPcopy
hub / github.com/grafana/tempo / TestBlockbuilder_startWithCommit

Function TestBlockbuilder_startWithCommit

modules/blockbuilder/blockbuilder_test.go:173–224  ·  view source on GitHub ↗

Starting with a pre-existing commit, the block-builder resumes from the last known position, consuming new records, and ensures all of them are properly committed and flushed into blocks.

(t *testing.T)

Source from the content-addressed store, hash-verified

171// the block-builder resumes from the last known position, consuming new records,
172// and ensures all of them are properly committed and flushed into blocks.
173func TestBlockbuilder_startWithCommit(t *testing.T) {
174 ctx, cancel := context.WithCancelCause(context.Background())
175 t.Cleanup(func() { cancel(errors.New("test done")) })
176
177 k, address := testkafka.CreateCluster(t, 100, testTopic)
178
179 kafkaCommits := atomic.NewInt32(0)
180 k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) {
181 kafkaCommits.Inc()
182 return nil, nil, false
183 })
184
185 store := newStore(ctx, t)
186 cfg := blockbuilderConfig(t, address, []int32{0})
187
188 client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
189 producedRecords := testkafka.SendTracesFor(t, ctx, client, 5*time.Second, 100*time.Millisecond, ingest.Encode) // Send for 5 seconds
190
191 commitedAt := len(producedRecords) / 2
192 // Commit half of the records
193 offsets := make(kadm.Offsets)
194 offsets.Add(kadm.Offset{
195 Topic: testTopic,
196 Partition: 0,
197 At: producedRecords[commitedAt].Offset,
198 })
199 admClient := kadm.NewClient(client)
200 require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets))
201
202 b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
203 require.NoError(t, err)
204 require.NoError(t, services.StartAndAwaitRunning(ctx, b))
205 t.Cleanup(func() {
206 require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
207 })
208
209 records := testkafka.SendTracesFor(t, ctx, client, 5*time.Second, 100*time.Millisecond, ingest.Encode) // Send for 5 seconds
210 producedRecords = append(producedRecords, records...)
211
212 // Wait for record to be consumed and committed.
213 require.Eventually(t, func() bool {
214 return kafkaCommits.Load() > 0
215 }, time.Minute, time.Second)
216
217 // Wait for the block to be flushed.
218 require.Eventually(t, func() bool {
219 return countFlushedTraces(store) == len(producedRecords)-commitedAt
220 }, time.Minute, time.Second)
221
222 // Check committed offset
223 requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1)
224}
225
226// In case a block flush initially fails, the system retries until it succeeds.
227func TestBlockbuilder_flushingFails(t *testing.T) {

Callers

nothing calls this directly

Calls 13

CreateClusterFunction · 0.92
NewKafkaClientFunction · 0.92
SendTracesForFunction · 0.92
blockbuilderConfigFunction · 0.85
newPartitionRingReaderFunction · 0.85
countFlushedTracesFunction · 0.85
requireLastCommitEqualsFunction · 0.85
ControlKeyMethod · 0.80
newStoreFunction · 0.70
NewFunction · 0.70
testLoggerFunction · 0.70
IncMethod · 0.65

Tested by

no test coverage detected