MCPcopy
hub / github.com/segmentio/kafka-go / doRequest

Method doRequest

conn.go:1370–1388  ·  view source on GitHub ↗
(d *connDeadline, write func(time.Time, int32) error)

Source from the content-addressed store, hash-verified

1368}
1369
1370func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
1371 c.enter()
1372 c.wlock.Lock()
1373 c.correlationID++
1374 id = c.correlationID
1375 err = write(d.setConnWriteDeadline(c.conn), id)
1376 d.unsetConnWriteDeadline()
1377
1378 if err != nil {
1379 // When an error occurs there's no way to know if the connection is in a
1380 // recoverable state so we're better off just giving up at this point to
1381 // avoid any risk of corrupting the following operations.
1382 c.conn.Close()
1383 c.leave()
1384 }
1385
1386 c.wlock.Unlock()
1387 return
1388}
1389
1390func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) {
1391 for {

Callers 3

ReadBatchWithMethod · 0.95
doMethod · 0.95
ApiVersionsMethod · 0.95

Calls 5

enterMethod · 0.95
leaveMethod · 0.95
setConnWriteDeadlineMethod · 0.80
CloseMethod · 0.45

Tested by

no test coverage detected