MCPcopy
hub / github.com/segmentio/kafka-go / ReadResponse

Function ReadResponse

protocol/response.go:11–84  ·  view source on GitHub ↗
(r io.Reader, apiKey ApiKey, apiVersion int16)

Source from the content-addressed store, hash-verified

9)
10
11func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error) {
12 if i := int(apiKey); i < 0 || i >= len(apiTypes) {
13 err = fmt.Errorf("unsupported api key: %d", i)
14 return
15 }
16
17 t := &apiTypes[apiKey]
18 if t == nil {
19 err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
20 return
21 }
22
23 minVersion := t.minVersion()
24 maxVersion := t.maxVersion()
25
26 if apiVersion < minVersion || apiVersion > maxVersion {
27 err = fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
28 return
29 }
30
31 d := &decoder{reader: r, remain: 4}
32 size := d.readInt32()
33
34 if err = d.err; err != nil {
35 err = dontExpectEOF(err)
36 return
37 }
38
39 d.remain = int(size)
40 correlationID = d.readInt32()
41 if err = d.err; err != nil {
42 if errors.Is(err, io.ErrUnexpectedEOF) {
43 // If a Writer/Reader is configured without TLS and connects
44 // to a broker expecting TLS the only message we return to the
45 // caller is io.ErrUnexpetedEOF which is opaque. This section
46 // tries to determine if that's what has happened.
47 // We first deconstruct the initial 4 bytes of the message
48 // from the size which was read earlier.
49 // Next, we examine those bytes to see if they looks like a TLS
50 // error message. If they do we wrap the io.ErrUnexpectedEOF
51 // with some context.
52 if looksLikeUnexpectedTLS(size) {
53 err = fmt.Errorf("%w: broker appears to be expecting TLS", io.ErrUnexpectedEOF)
54 }
55 return
56 }
57 err = dontExpectEOF(err)
58 return
59 }
60
61 res := &t.responses[apiVersion-minVersion]
62
63 if res.flexible {
64 // In the flexible case, there's a tag buffer at the end of the response header
65 taggedCount := int(d.readUnsignedVarInt())
66 for i := 0; i < taggedCount; i++ {
67 d.readUnsignedVarInt() // tagID
68 size := d.readUnsignedVarInt()

Callers 4

TestResponseFunction · 0.92
BenchmarkResponseFunction · 0.92
RoundTripFunction · 0.85

Calls 10

readInt32Method · 0.95
readUnsignedVarIntMethod · 0.95
readMethod · 0.95
discardAllMethod · 0.95
looksLikeUnexpectedTLSFunction · 0.85
minVersionMethod · 0.80
maxVersionMethod · 0.80
newMethod · 0.80
dontExpectEOFFunction · 0.70
valueOfFunction · 0.70

Tested by 2

TestResponseFunction · 0.74