(t *testing.T)
| 119 | } |
| 120 | |
| 121 | func TestConsume_RaceTest(t *testing.T) { |
| 122 | const ( |
| 123 | groupID = "test-group" |
| 124 | topic = "test-topic" |
| 125 | offsetStart = int64(1234) |
| 126 | ) |
| 127 | |
| 128 | cfg := NewTestConfig() |
| 129 | cfg.Version = V2_8_1_0 |
| 130 | cfg.Consumer.Return.Errors = true |
| 131 | cfg.Metadata.Full = true |
| 132 | |
| 133 | seedBroker := NewMockBroker(t, 1) |
| 134 | defer seedBroker.Close() |
| 135 | |
| 136 | handlerMap := map[string]MockResponse{ |
| 137 | "ApiVersionsRequest": NewMockApiVersionsResponse(t), |
| 138 | "MetadataRequest": NewMockMetadataResponse(t). |
| 139 | SetBroker(seedBroker.Addr(), seedBroker.BrokerID()). |
| 140 | SetError("mismatched-topic", ErrUnknownTopicOrPartition), |
| 141 | "OffsetRequest": NewMockOffsetResponse(t). |
| 142 | SetOffset(topic, 0, -1, offsetStart), |
| 143 | "OffsetFetchRequest": NewMockOffsetFetchResponse(t). |
| 144 | SetOffset(groupID, topic, 0, offsetStart, "", ErrNoError), |
| 145 | "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t). |
| 146 | SetCoordinator(CoordinatorGroup, groupID, seedBroker), |
| 147 | "JoinGroupRequest": NewMockJoinGroupResponse(t), |
| 148 | "SyncGroupRequest": NewMockSyncGroupResponse(t).SetMemberAssignment( |
| 149 | &ConsumerGroupMemberAssignment{ |
| 150 | Version: 1, |
| 151 | Topics: map[string][]int32{topic: {0}}, // map "test-topic" to partition 0 |
| 152 | UserData: []byte{0x01}, |
| 153 | }, |
| 154 | ), |
| 155 | "HeartbeatRequest": NewMockHeartbeatResponse(t), |
| 156 | } |
| 157 | seedBroker.SetHandlerByMap(handlerMap) |
| 158 | |
| 159 | cancelCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) |
| 160 | |
| 161 | retryWait := 10 * time.Millisecond |
| 162 | var err error |
| 163 | clientRetries := 0 |
| 164 | outerFor: |
| 165 | for { |
| 166 | _, err = NewConsumerGroup([]string{seedBroker.Addr()}, groupID, cfg) |
| 167 | if err == nil { |
| 168 | break |
| 169 | } |
| 170 | |
| 171 | if retryWait < time.Minute { |
| 172 | retryWait *= 2 |
| 173 | } |
| 174 | |
| 175 | clientRetries++ |
| 176 | |
| 177 | timer := time.NewTimer(retryWait) |
| 178 | select { |
nothing calls this directly
no test coverage detected