MCPcopy
hub / github.com/segmentio/kafka-go / do

Method do

conn.go:1347–1368  ·  view source on GitHub ↗
(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error)

Source from the content-addressed store, hash-verified

1345}
1346
1347func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error {
1348 id, err := c.doRequest(d, write)
1349 if err != nil {
1350 return err
1351 }
1352
1353 deadline, size, lock, err := c.waitResponse(d, id)
1354 if err != nil {
1355 return err
1356 }
1357
1358 if err = read(deadline, size); err != nil {
1359 var kafkaError Error
1360 if !errors.As(err, &kafkaError) {
1361 c.conn.Close()
1362 }
1363 }
1364
1365 d.unsetConnReadDeadline()
1366 lock.Unlock()
1367 return err
1368}
1369
1370func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
1371 c.enter()

Callers 2

readOperationMethod · 0.95
writeOperationMethod · 0.95

Calls 5

doRequestMethod · 0.95
waitResponseMethod · 0.95
readFunction · 0.85
unsetConnReadDeadlineMethod · 0.80
CloseMethod · 0.45

Tested by

no test coverage detected