MCPcopy
hub / github.com/IBM/sarama / sendAndReceiveApiVersions

Method sendAndReceiveApiVersions

broker.go:1384–1447  ·  view source on GitHub ↗
(v int16)

Source from the content-addressed store, hash-verified

1382}
1383
1384func (b *Broker) sendAndReceiveApiVersions(v int16) (*ApiVersionsResponse, error) {
1385 rb := &ApiVersionsRequest{
1386 Version: v,
1387 ClientSoftwareName: defaultClientSoftwareName,
1388 ClientSoftwareVersion: version(),
1389 }
1390
1391 req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
1392 buf, err := encode(req, b.metricRegistry)
1393 if err != nil {
1394 return nil, err
1395 }
1396
1397 requestTime := time.Now()
1398 // Will be decremented in updateIncomingCommunicationMetrics (except error)
1399 b.addRequestInFlightMetrics(1)
1400 bytes, err := b.write(buf)
1401 b.updateOutgoingCommunicationMetrics(bytes)
1402 if err != nil {
1403 b.addRequestInFlightMetrics(-1)
1404 Logger.Printf("Failed to send ApiVersionsRequest V%d to %s: %s\n", v, b.addr, err)
1405 return nil, err
1406 }
1407 b.correlationID++
1408
1409 // Kafka protocol response structure:
1410 // - Message length (4 bytes): Total length of the response excluding this field
1411 // - ResponseHeader v0 (4 bytes): Contains correlation ID for request-response matching
1412 header := make([]byte, 8)
1413 _, err = b.readFull(header)
1414 if err != nil {
1415 b.addRequestInFlightMetrics(-1)
1416 Logger.Printf("Failed to read ApiVersionsResponse V%d header from %s: %s\n", v, b.addr, err)
1417 return nil, err
1418 }
1419
1420 length := binary.BigEndian.Uint32(header[:4])
1421 // we're not using the correlation ID here, but it is part of the response header
1422 // correlationID := binary.BigEndian.Uint32(header[4:])
1423
1424 payload := make([]byte, length-4)
1425 n, err := b.readFull(payload)
1426 if err != nil {
1427 b.addRequestInFlightMetrics(-1)
1428 Logger.Printf("Failed to read ApiVersionsResponse V%d payload from %s: %s\n", v, b.addr, err)
1429 return nil, err
1430 }
1431
1432 b.updateIncomingCommunicationMetrics(n+8, time.Since(requestTime))
1433 res := &ApiVersionsResponse{Version: rb.version()}
1434 err = versionedDecode(payload, res, rb.version(), b.metricRegistry)
1435 if err != nil {
1436 Logger.Printf("Failed to parse ApiVersionsResponse V%d from %s: %s\n", v, b.addr, err)
1437 return nil, err
1438 }
1439
1440 kerr := KError(res.ErrorCode)
1441 if kerr != ErrNoError {

Callers 1

OpenMethod · 0.95

Calls 12

writeMethod · 0.95
readFullMethod · 0.95
versionMethod · 0.95
versionFunction · 0.85
encodeFunction · 0.85
versionedDecodeFunction · 0.85
KErrorTypeAlias · 0.85
PrintfMethod · 0.65
ErrorfMethod · 0.65

Tested by

no test coverage detected