Test that concurrent Commit() calls do not deadlock when the coordinator returns errors that trigger releaseCoordinator while other goroutines are in constructRequest or coordinator(). This reproduces the lock ordering issue from https://github.com/IBM/sarama/issues/3191 where four goroutines form a
(t *testing.T)
| 214 | // issue from https://github.com/IBM/sarama/issues/3191 where four goroutines |
| 215 | // form a cycle: broker.lock -> pomsLock -> brokerLock -> broker.lock. |
| 216 | func TestOffsetManagerCommitConcurrentDeadlock(t *testing.T) { |
| 217 | seedBroker := NewMockBroker(t, 1) |
| 218 | defer seedBroker.Close() |
| 219 | |
| 220 | coordinatorBroker := NewMockBroker(t, 2) |
| 221 | defer coordinatorBroker.Close() |
| 222 | |
| 223 | findCoordDelay := 500 * time.Microsecond |
| 224 | seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{ |
| 225 | "MetadataRequest": func(req *request) encoderWithHeader { |
| 226 | resp := new(MetadataResponse) |
| 227 | resp.AddBroker(coordinatorBroker.Addr(), coordinatorBroker.BrokerID()) |
| 228 | return resp |
| 229 | }, |
| 230 | "FindCoordinatorRequest": func(req *request) encoderWithHeader { |
| 231 | // slow coordinator lookup increases the window where brokerLock |
| 232 | // is held in coordinator(), while LeastLoadedBroker calls |
| 233 | // ResponseSize (which needs broker.lock), widening the |
| 234 | // deadlock window |
| 235 | time.Sleep(findCoordDelay) |
| 236 | resp := new(FindCoordinatorResponse) |
| 237 | resp.Coordinator = &Broker{id: coordinatorBroker.brokerID, addr: coordinatorBroker.Addr()} |
| 238 | return resp |
| 239 | }, |
| 240 | }) |
| 241 | |
| 242 | var commitCount atomic.Int64 |
| 243 | coordinatorBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{ |
| 244 | "ConsumerMetadataRequest": func(req *request) encoderWithHeader { |
| 245 | return &ConsumerMetadataResponse{ |
| 246 | CoordinatorID: coordinatorBroker.BrokerID(), |
| 247 | CoordinatorHost: "127.0.0.1", |
| 248 | CoordinatorPort: coordinatorBroker.Port(), |
| 249 | } |
| 250 | }, |
| 251 | "FindCoordinatorRequest": func(req *request) encoderWithHeader { |
| 252 | time.Sleep(findCoordDelay) |
| 253 | resp := new(FindCoordinatorResponse) |
| 254 | resp.Coordinator = &Broker{id: coordinatorBroker.brokerID, addr: coordinatorBroker.Addr()} |
| 255 | return resp |
| 256 | }, |
| 257 | "OffsetFetchRequest": func(r *request) encoderWithHeader { |
| 258 | req := r.body.(*OffsetFetchRequest) |
| 259 | resp := new(OffsetFetchResponse) |
| 260 | resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{} |
| 261 | for topic, partitions := range req.partitions { |
| 262 | for _, partition := range partitions { |
| 263 | if _, ok := resp.Blocks[topic]; !ok { |
| 264 | resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{} |
| 265 | } |
| 266 | resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{ |
| 267 | Offset: 0, |
| 268 | Err: ErrNoError, |
| 269 | } |
| 270 | } |
| 271 | } |
| 272 | return resp |
| 273 | }, |
nothing calls this directly
no test coverage detected