MCPcopy
hub / github.com/segmentio/kafka-go / waitResponse

Method waitResponse

conn.go:1390–1430  ·  view source on GitHub ↗
(d *connDeadline, id int32)

Source from the content-addressed store, hash-verified

1388}
1389
1390func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) {
1391 for {
1392 var rsz int32
1393 var rid int32
1394
1395 c.rlock.Lock()
1396 deadline = d.setConnReadDeadline(c.conn)
1397 rsz, rid, err = c.peekResponseSizeAndID()
1398
1399 if err != nil {
1400 d.unsetConnReadDeadline()
1401 c.conn.Close()
1402 c.rlock.Unlock()
1403 break
1404 }
1405
1406 if id == rid {
1407 c.skipResponseSizeAndID()
1408 size, lock = int(rsz-4), &c.rlock
1409 // Don't unlock the read mutex to yield ownership to the caller.
1410 break
1411 }
1412
1413 if c.concurrency() == 1 {
1414 // If the goroutine is the only one waiting on this connection it
1415 // should be impossible to read a correlation id different from the
1416 // one it expects. This is a sign that the data we are reading on
1417 // the wire is corrupted and the connection needs to be closed.
1418 err = io.ErrNoProgress
1419 c.rlock.Unlock()
1420 break
1421 }
1422
1423 // Optimistically release the read lock if a response has already
1424 // been received but the current operation is not the target for it.
1425 c.rlock.Unlock()
1426 }
1427
1428 c.leave()
1429 return
1430}
1431
1432func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32) requestHeader {
1433 return requestHeader{

Callers 3

ReadBatchWithMethod · 0.95
doMethod · 0.95
ApiVersionsMethod · 0.95

Calls 7

peekResponseSizeAndIDMethod · 0.95
skipResponseSizeAndIDMethod · 0.95
concurrencyMethod · 0.95
leaveMethod · 0.95
setConnReadDeadlineMethod · 0.80
unsetConnReadDeadlineMethod · 0.80
CloseMethod · 0.45

Tested by

no test coverage detected