(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error)
| 1345 | } |
| 1346 | |
| 1347 | func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error { |
| 1348 | id, err := c.doRequest(d, write) |
| 1349 | if err != nil { |
| 1350 | return err |
| 1351 | } |
| 1352 | |
| 1353 | deadline, size, lock, err := c.waitResponse(d, id) |
| 1354 | if err != nil { |
| 1355 | return err |
| 1356 | } |
| 1357 | |
| 1358 | if err = read(deadline, size); err != nil { |
| 1359 | var kafkaError Error |
| 1360 | if !errors.As(err, &kafkaError) { |
| 1361 | c.conn.Close() |
| 1362 | } |
| 1363 | } |
| 1364 | |
| 1365 | d.unsetConnReadDeadline() |
| 1366 | lock.Unlock() |
| 1367 | return err |
| 1368 | } |
| 1369 | |
| 1370 | func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) { |
| 1371 | c.enter() |
no test coverage detected