(t *testing.T, cfg KafkaConfig)
| 200 | } |
| 201 | |
| 202 | func createTestKafkaClient(t *testing.T, cfg KafkaConfig) *kgo.Client { |
| 203 | metrics := kprom.NewMetrics("", kprom.Registerer(prometheus.NewPedanticRegistry())) |
| 204 | opts := commonKafkaClientOptions(cfg, metrics, test.NewTestingLogger(t)) |
| 205 | |
| 206 | // Use the manual partitioner because produceRecord() utility explicitly specifies |
| 207 | // the partition to write to in the kgo.Record itself. |
| 208 | opts = append(opts, kgo.RecordPartitioner(kgo.ManualPartitioner())) |
| 209 | |
| 210 | client, err := kgo.NewClient(opts...) |
| 211 | require.NoError(t, err) |
| 212 | |
| 213 | // Automatically close it at the end of the test. |
| 214 | t.Cleanup(client.Close) |
| 215 | |
| 216 | return client |
| 217 | } |
| 218 | |
| 219 | func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, partitionID int32, content []byte) { |
| 220 | _ = produceRecordWithVersion(ctx, t, writeClient, partitionID, content, 1) |
no test coverage detected