(v int16)
| 1382 | } |
| 1383 | |
| 1384 | func (b *Broker) sendAndReceiveApiVersions(v int16) (*ApiVersionsResponse, error) { |
| 1385 | rb := &ApiVersionsRequest{ |
| 1386 | Version: v, |
| 1387 | ClientSoftwareName: defaultClientSoftwareName, |
| 1388 | ClientSoftwareVersion: version(), |
| 1389 | } |
| 1390 | |
| 1391 | req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb} |
| 1392 | buf, err := encode(req, b.metricRegistry) |
| 1393 | if err != nil { |
| 1394 | return nil, err |
| 1395 | } |
| 1396 | |
| 1397 | requestTime := time.Now() |
| 1398 | // Will be decremented in updateIncomingCommunicationMetrics (except error) |
| 1399 | b.addRequestInFlightMetrics(1) |
| 1400 | bytes, err := b.write(buf) |
| 1401 | b.updateOutgoingCommunicationMetrics(bytes) |
| 1402 | if err != nil { |
| 1403 | b.addRequestInFlightMetrics(-1) |
| 1404 | Logger.Printf("Failed to send ApiVersionsRequest V%d to %s: %s\n", v, b.addr, err) |
| 1405 | return nil, err |
| 1406 | } |
| 1407 | b.correlationID++ |
| 1408 | |
| 1409 | // Kafka protocol response structure: |
| 1410 | // - Message length (4 bytes): Total length of the response excluding this field |
| 1411 | // - ResponseHeader v0 (4 bytes): Contains correlation ID for request-response matching |
| 1412 | header := make([]byte, 8) |
| 1413 | _, err = b.readFull(header) |
| 1414 | if err != nil { |
| 1415 | b.addRequestInFlightMetrics(-1) |
| 1416 | Logger.Printf("Failed to read ApiVersionsResponse V%d header from %s: %s\n", v, b.addr, err) |
| 1417 | return nil, err |
| 1418 | } |
| 1419 | |
| 1420 | length := binary.BigEndian.Uint32(header[:4]) |
| 1421 | // we're not using the correlation ID here, but it is part of the response header |
| 1422 | // correlationID := binary.BigEndian.Uint32(header[4:]) |
| 1423 | |
| 1424 | payload := make([]byte, length-4) |
| 1425 | n, err := b.readFull(payload) |
| 1426 | if err != nil { |
| 1427 | b.addRequestInFlightMetrics(-1) |
| 1428 | Logger.Printf("Failed to read ApiVersionsResponse V%d payload from %s: %s\n", v, b.addr, err) |
| 1429 | return nil, err |
| 1430 | } |
| 1431 | |
| 1432 | b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime)) |
| 1433 | res := &ApiVersionsResponse{Version: rb.version()} |
| 1434 | err = versionedDecode(payload, res, rb.version(), b.metricRegistry) |
| 1435 | if err != nil { |
| 1436 | Logger.Printf("Failed to parse ApiVersionsResponse V%d from %s: %s\n", v, b.addr, err) |
| 1437 | return nil, err |
| 1438 | } |
| 1439 | |
| 1440 | kerr := KError(res.ErrorCode) |
| 1441 | if kerr != ErrNoError { |
no test coverage detected