(d *connDeadline, id int32)
| 1388 | } |
| 1389 | |
| 1390 | func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) { |
| 1391 | for { |
| 1392 | var rsz int32 |
| 1393 | var rid int32 |
| 1394 | |
| 1395 | c.rlock.Lock() |
| 1396 | deadline = d.setConnReadDeadline(c.conn) |
| 1397 | rsz, rid, err = c.peekResponseSizeAndID() |
| 1398 | |
| 1399 | if err != nil { |
| 1400 | d.unsetConnReadDeadline() |
| 1401 | c.conn.Close() |
| 1402 | c.rlock.Unlock() |
| 1403 | break |
| 1404 | } |
| 1405 | |
| 1406 | if id == rid { |
| 1407 | c.skipResponseSizeAndID() |
| 1408 | size, lock = int(rsz-4), &c.rlock |
| 1409 | // Don't unlock the read mutex to yield ownership to the caller. |
| 1410 | break |
| 1411 | } |
| 1412 | |
| 1413 | if c.concurrency() == 1 { |
| 1414 | // If the goroutine is the only one waiting on this connection it |
| 1415 | // should be impossible to read a correlation id different from the |
| 1416 | // one it expects. This is a sign that the data we are reading on |
| 1417 | // the wire is corrupted and the connection needs to be closed. |
| 1418 | err = io.ErrNoProgress |
| 1419 | c.rlock.Unlock() |
| 1420 | break |
| 1421 | } |
| 1422 | |
| 1423 | // Optimistically release the read lock if a response has already |
| 1424 | // been received but the current operation is not the target for it. |
| 1425 | c.rlock.Unlock() |
| 1426 | } |
| 1427 | |
| 1428 | c.leave() |
| 1429 | return |
| 1430 | } |
| 1431 | |
| 1432 | func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32) requestHeader { |
| 1433 | return requestHeader{ |
no test coverage detected