MCPcopy
hub / github.com/IBM/sarama / TestProduceSetPartitionTracking

Function TestProduceSetPartitionTracking

produce_set_test.go:91–136  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

89}
90
91func TestProduceSetPartitionTracking(t *testing.T) {
92 _, ps := makeProduceSet()
93
94 m1 := &ProducerMessage{Topic: "t1", Partition: 0}
95 m2 := &ProducerMessage{Topic: "t1", Partition: 1}
96 m3 := &ProducerMessage{Topic: "t2", Partition: 0}
97 safeAddMessage(t, ps, m1)
98 safeAddMessage(t, ps, m2)
99 safeAddMessage(t, ps, m3)
100
101 seenT1P0 := false
102 seenT1P1 := false
103 seenT2P0 := false
104
105 ps.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
106 if len(pSet.msgs) != 1 {
107 t.Error("Wrong message count")
108 }
109
110 if topic == "t1" && partition == 0 {
111 seenT1P0 = true
112 } else if topic == "t1" && partition == 1 {
113 seenT1P1 = true
114 } else if topic == "t2" && partition == 0 {
115 seenT2P0 = true
116 }
117 })
118
119 if !seenT1P0 {
120 t.Error("Didn't see t1p0")
121 }
122 if !seenT1P1 {
123 t.Error("Didn't see t1p1")
124 }
125 if !seenT2P0 {
126 t.Error("Didn't see t2p0")
127 }
128
129 if len(ps.dropPartition("t1", 1)) != 1 {
130 t.Error("Got wrong messages back from dropping partition")
131 }
132
133 if ps.bufferCount != 2 {
134 t.Error("Incorrect buffer count after dropping partition")
135 }
136}
137
138func TestProduceSetRequestBuilding(t *testing.T) {
139 parent, ps := makeProduceSet()

Callers

nothing calls this directly

Calls 5

makeProduceSetFunction · 0.85
safeAddMessageFunction · 0.85
eachPartitionMethod · 0.80
dropPartitionMethod · 0.80
ErrorMethod · 0.65

Tested by

no test coverage detected