MCPcopy
hub / github.com/grafana/tempo / createTestKafkaClient

Function createTestKafkaClient

pkg/ingest/partition_offset_client_test.go:202–217  ·  view source on GitHub ↗
(t *testing.T, cfg KafkaConfig)

Source from the content-addressed store, hash-verified

200}
201
202func createTestKafkaClient(t *testing.T, cfg KafkaConfig) *kgo.Client {
203 metrics := kprom.NewMetrics("", kprom.Registerer(prometheus.NewPedanticRegistry()))
204 opts := commonKafkaClientOptions(cfg, metrics, test.NewTestingLogger(t))
205
206 // Use the manual partitioner because produceRecord() utility explicitly specifies
207 // the partition to write to in the kgo.Record itself.
208 opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner()))
209
210 client, err := kgo.NewClient(opts...)
211 require.NoError(t, err)
212
213 // Automatically close it at the end of the test.
214 t.Cleanup(client.Close)
215
216 return client
217}
218
219func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, partitionID int32, content []byte) {
220 _ = produceRecordWithVersion(ctx, t, writeClient, partitionID, content, 1)

Calls 2

NewTestingLoggerFunction · 0.92
commonKafkaClientOptionsFunction · 0.85

Tested by

no test coverage detected