(t testing.TB, address, topic string)
| 14 | ) |
| 15 | |
| 16 | func NewKafkaClient(t testing.TB, address, topic string) *kgo.Client { |
| 17 | writeClient, err := kgo.NewClient( |
| 18 | kgo.SeedBrokers(address), |
| 19 | kgo.AllowAutoTopicCreation(), |
| 20 | kgo.DefaultProduceTopic(topic), |
| 21 | // We will choose the Partition of each record. |
| 22 | kgo.RecordPartitioner(kgo.ManualPartitioner()), |
| 23 | kgo.DisableClientMetrics(), |
| 24 | ) |
| 25 | require.NoError(t, err) |
| 26 | t.Cleanup(writeClient.Close) |
| 27 | |
| 28 | return writeClient |
| 29 | } |
| 30 | |
| 31 | type ReqOpts struct { |
| 32 | Partition int32 |
no outgoing calls