MCPcopy
hub / github.com/IBM/sarama / TestOffsetManagerCommitConcurrentDeadlock

Function TestOffsetManagerCommitConcurrentDeadlock

offset_manager_test.go:216–343  ·  view source on GitHub ↗

Test that concurrent Commit() calls do not deadlock when the coordinator returns errors that trigger releaseCoordinator while other goroutines are in constructRequest or coordinator(). This reproduces the lock ordering issue from https://github.com/IBM/sarama/issues/3191 where four goroutines form a

(t *testing.T)

Source from the content-addressed store, hash-verified

214// issue from https://github.com/IBM/sarama/issues/3191 where four goroutines
215// form a cycle: broker.lock -> pomsLock -> brokerLock -> broker.lock.
216func TestOffsetManagerCommitConcurrentDeadlock(t *testing.T) {
217 seedBroker := NewMockBroker(t, 1)
218 defer seedBroker.Close()
219
220 coordinatorBroker := NewMockBroker(t, 2)
221 defer coordinatorBroker.Close()
222
223 findCoordDelay := 500 * time.Microsecond
224 seedBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
225 "MetadataRequest": func(req *request) encoderWithHeader {
226 resp := new(MetadataResponse)
227 resp.AddBroker(coordinatorBroker.Addr(), coordinatorBroker.BrokerID())
228 return resp
229 },
230 "FindCoordinatorRequest": func(req *request) encoderWithHeader {
231 // slow coordinator lookup increases the window where brokerLock
232 // is held in coordinator(), while LeastLoadedBroker calls
233 // ResponseSize (which needs broker.lock), widening the
234 // deadlock window
235 time.Sleep(findCoordDelay)
236 resp := new(FindCoordinatorResponse)
237 resp.Coordinator = &Broker{id: coordinatorBroker.brokerID, addr: coordinatorBroker.Addr()}
238 return resp
239 },
240 })
241
242 var commitCount atomic.Int64
243 coordinatorBroker.SetHandlerFuncByMap(map[string]requestHandlerFunc{
244 "ConsumerMetadataRequest": func(req *request) encoderWithHeader {
245 return &ConsumerMetadataResponse{
246 CoordinatorID: coordinatorBroker.BrokerID(),
247 CoordinatorHost: "127.0.0.1",
248 CoordinatorPort: coordinatorBroker.Port(),
249 }
250 },
251 "FindCoordinatorRequest": func(req *request) encoderWithHeader {
252 time.Sleep(findCoordDelay)
253 resp := new(FindCoordinatorResponse)
254 resp.Coordinator = &Broker{id: coordinatorBroker.brokerID, addr: coordinatorBroker.Addr()}
255 return resp
256 },
257 "OffsetFetchRequest": func(r *request) encoderWithHeader {
258 req := r.body.(*OffsetFetchRequest)
259 resp := new(OffsetFetchResponse)
260 resp.Blocks = map[string]map[int32]*OffsetFetchResponseBlock{}
261 for topic, partitions := range req.partitions {
262 for _, partition := range partitions {
263 if _, ok := resp.Blocks[topic]; !ok {
264 resp.Blocks[topic] = map[int32]*OffsetFetchResponseBlock{}
265 }
266 resp.Blocks[topic][partition] = &OffsetFetchResponseBlock{
267 Offset: 0,
268 Err: ErrNoError,
269 }
270 }
271 }
272 return resp
273 },

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
SetHandlerFuncByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
PortMethod · 0.95
NewMockBrokerFunction · 0.85
AddBrokerMethod · 0.80
FatalMethod · 0.80
NewTestConfigFunction · 0.70
NewClientFunction · 0.70
safeCloseFunction · 0.70

Tested by

no test coverage detected