CreateCluster returns a fake Kafka cluster for unit testing.
(t testing.TB, numPartitions int32, topicName string)
| 24 | |
| 25 | // CreateCluster returns a fake Kafka cluster for unit testing. |
| 26 | func CreateCluster(t testing.TB, numPartitions int32, topicName string) (*Cluster, string) { |
| 27 | fake, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(numPartitions, topicName)) |
| 28 | require.NoError(t, err) |
| 29 | t.Cleanup(fake.Close) |
| 30 | |
| 31 | addrs := fake.ListenAddrs() |
| 32 | require.Len(t, addrs, 1) |
| 33 | |
| 34 | c := &Cluster{ |
| 35 | t: t, |
| 36 | fake: fake, |
| 37 | topic: topicName, |
| 38 | numPartitions: int(numPartitions), |
| 39 | committedOffsets: map[string][]int64{}, |
| 40 | controlFuncs: map[kmsg.Key]controlFn{}, |
| 41 | } |
| 42 | |
| 43 | // Add support for consumer groups |
| 44 | c.fake.ControlKey(kmsg.OffsetCommit.Int16(), c.offsetCommit) |
| 45 | c.fake.ControlKey(kmsg.OffsetFetch.Int16(), c.offsetFetch) |
| 46 | |
| 47 | return c, addrs[0] |
| 48 | } |
| 49 | |
| 50 | func (c *Cluster) ControlKey(key kmsg.Key, fn controlFn) { |
| 51 | switch key { |