(t *testing.T)
| 89 | } |
| 90 | |
| 91 | func TestProduceSetPartitionTracking(t *testing.T) { |
| 92 | _, ps := makeProduceSet() |
| 93 | |
| 94 | m1 := &ProducerMessage{Topic: "t1", Partition: 0} |
| 95 | m2 := &ProducerMessage{Topic: "t1", Partition: 1} |
| 96 | m3 := &ProducerMessage{Topic: "t2", Partition: 0} |
| 97 | safeAddMessage(t, ps, m1) |
| 98 | safeAddMessage(t, ps, m2) |
| 99 | safeAddMessage(t, ps, m3) |
| 100 | |
| 101 | seenT1P0 := false |
| 102 | seenT1P1 := false |
| 103 | seenT2P0 := false |
| 104 | |
| 105 | ps.eachPartition(func(topic string, partition int32, pSet *partitionSet) { |
| 106 | if len(pSet.msgs) != 1 { |
| 107 | t.Error("Wrong message count") |
| 108 | } |
| 109 | |
| 110 | if topic == "t1" && partition == 0 { |
| 111 | seenT1P0 = true |
| 112 | } else if topic == "t1" && partition == 1 { |
| 113 | seenT1P1 = true |
| 114 | } else if topic == "t2" && partition == 0 { |
| 115 | seenT2P0 = true |
| 116 | } |
| 117 | }) |
| 118 | |
| 119 | if !seenT1P0 { |
| 120 | t.Error("Didn't see t1p0") |
| 121 | } |
| 122 | if !seenT1P1 { |
| 123 | t.Error("Didn't see t1p1") |
| 124 | } |
| 125 | if !seenT2P0 { |
| 126 | t.Error("Didn't see t2p0") |
| 127 | } |
| 128 | |
| 129 | if len(ps.dropPartition("t1", 1)) != 1 { |
| 130 | t.Error("Got wrong messages back from dropping partition") |
| 131 | } |
| 132 | |
| 133 | if ps.bufferCount != 2 { |
| 134 | t.Error("Incorrect buffer count after dropping partition") |
| 135 | } |
| 136 | } |
| 137 | |
| 138 | func TestProduceSetRequestBuilding(t *testing.T) { |
| 139 | parent, ps := makeProduceSet() |
nothing calls this directly
no test coverage detected