MCPcopy
hub / github.com/IBM/sarama / topicWithEvenLeaders

Function topicWithEvenLeaders

functional_admin_test.go:20–66  ·  view source on GitHub ↗
(t *testing.T, adminClient ClusterAdmin, client Client, numPartitions int32)

Source from the content-addressed store, hash-verified

18)
19
20func topicWithEvenLeaders(t *testing.T, adminClient ClusterAdmin, client Client, numPartitions int32) (string, error) {
21 t.Helper()
22
23 if len(FunctionalTestEnv.KafkaBrokerAddrs) == 0 {
24 return "", fmt.Errorf("no brokers available for replica assignment")
25 }
26
27 brokers := client.Brokers()
28 brokerIDs := make([]int32, 0, len(brokers))
29 for _, broker := range brokers {
30 brokerIDs = append(brokerIDs, broker.ID())
31 }
32 if len(brokerIDs) == 0 {
33 return "", fmt.Errorf("no broker IDs available for replica assignment")
34 }
35 slices.Sort(brokerIDs)
36
37 topic := fmt.Sprintf("list-offsets-%d", time.Now().UnixNano())
38 replicaAssignment := make(map[int32][]int32, numPartitions)
39 for partition := int32(0); partition < numPartitions; partition++ {
40 brokerIndex := partition % int32(len(brokerIDs))
41 replicaAssignment[partition] = []int32{brokerIDs[brokerIndex]}
42 }
43
44 err := adminClient.CreateTopic(topic, &TopicDetail{
45 NumPartitions: -1,
46 ReplicationFactor: -1,
47 ReplicaAssignment: replicaAssignment,
48 }, false)
49 if err != nil {
50 return "", err
51 }
52
53 // topic creation is asynchronous; wait for every partition to have an
54 // elected leader before returning (producing too early fails with
55 // ErrNotLeaderForPartition or ErrUnknownTopicOrPartition)
56 require.EventuallyWithT(t, func(t *assert.CollectT) {
57 require.NoError(t, client.RefreshMetadata(topic))
58 for partition := int32(0); partition < numPartitions; partition++ {
59 leader, err := client.Leader(topic, partition)
60 require.NoError(t, err, "no leader for %s/%d", topic, partition)
61 require.NotNil(t, leader)
62 }
63 }, 30*time.Second, 250*time.Millisecond, "leaders were not elected for all partitions of %s", topic)
64
65 return topic, nil
66}
67
68func produceMessagesForPartitions(t *testing.T, client Client, topic string, partitionsCount int32, messagesPerPartition int, baseTimestamp int64) {
69 t.Helper()

Callers 2

TestFuncAdminListOffsetsFunction · 0.85

Calls 7

HelperMethod · 0.80
IDMethod · 0.80
ErrorfMethod · 0.65
BrokersMethod · 0.65
CreateTopicMethod · 0.65
RefreshMetadataMethod · 0.65
LeaderMethod · 0.65

Tested by

no test coverage detected