Starting with a pre-existing commit, the block-builder resumes from the last known position, consuming new records, and ensures all of them are properly committed and flushed into blocks.
(t *testing.T)
| 171 | // the block-builder resumes from the last known position, consuming new records, |
| 172 | // and ensures all of them are properly committed and flushed into blocks. |
| 173 | func TestBlockbuilder_startWithCommit(t *testing.T) { |
| 174 | ctx, cancel := context.WithCancelCause(context.Background()) |
| 175 | t.Cleanup(func() { cancel(errors.New("test done")) }) |
| 176 | |
| 177 | k, address := testkafka.CreateCluster(t, 100, testTopic) |
| 178 | |
| 179 | kafkaCommits := atomic.NewInt32(0) |
| 180 | k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) { |
| 181 | kafkaCommits.Inc() |
| 182 | return nil, nil, false |
| 183 | }) |
| 184 | |
| 185 | store := newStore(ctx, t) |
| 186 | cfg := blockbuilderConfig(t, address, []int32{0}) |
| 187 | |
| 188 | client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic) |
| 189 | producedRecords := testkafka.SendTracesFor(t, ctx, client, 5*time.Second, 100*time.Millisecond, ingest.Encode) // Send for 5 seconds |
| 190 | |
| 191 | commitedAt := len(producedRecords) / 2 |
| 192 | // Commit half of the records |
| 193 | offsets := make(kadm.Offsets) |
| 194 | offsets.Add(kadm.Offset{ |
| 195 | Topic: testTopic, |
| 196 | Partition: 0, |
| 197 | At: producedRecords[commitedAt].Offset, |
| 198 | }) |
| 199 | admClient := kadm.NewClient(client) |
| 200 | require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets)) |
| 201 | |
| 202 | b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) |
| 203 | require.NoError(t, err) |
| 204 | require.NoError(t, services.StartAndAwaitRunning(ctx, b)) |
| 205 | t.Cleanup(func() { |
| 206 | require.NoError(t, services.StopAndAwaitTerminated(ctx, b)) |
| 207 | }) |
| 208 | |
| 209 | records := testkafka.SendTracesFor(t, ctx, client, 5*time.Second, 100*time.Millisecond, ingest.Encode) // Send for 5 seconds |
| 210 | producedRecords = append(producedRecords, records...) |
| 211 | |
| 212 | // Wait for record to be consumed and committed. |
| 213 | require.Eventually(t, func() bool { |
| 214 | return kafkaCommits.Load() > 0 |
| 215 | }, time.Minute, time.Second) |
| 216 | |
| 217 | // Wait for the block to be flushed. |
| 218 | require.Eventually(t, func() bool { |
| 219 | return countFlushedTraces(store) == len(producedRecords)-commitedAt |
| 220 | }, time.Minute, time.Second) |
| 221 | |
| 222 | // Check committed offset |
| 223 | requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1) |
| 224 | } |
| 225 | |
| 226 | // In case a block flush initially fails, the system retries until it succeeds. |
| 227 | func TestBlockbuilder_flushingFails(t *testing.T) { |
nothing calls this directly
no test coverage detected