| 9 | ) |
| 10 | |
| 11 | func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error) { |
| 12 | if i := int(apiKey); i < 0 || i >= len(apiTypes) { |
| 13 | err = fmt.Errorf("unsupported api key: %d", i) |
| 14 | return |
| 15 | } |
| 16 | |
| 17 | t := &apiTypes[apiKey] |
| 18 | if t == nil { |
| 19 | err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) |
| 20 | return |
| 21 | } |
| 22 | |
| 23 | minVersion := t.minVersion() |
| 24 | maxVersion := t.maxVersion() |
| 25 | |
| 26 | if apiVersion < minVersion || apiVersion > maxVersion { |
| 27 | err = fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion) |
| 28 | return |
| 29 | } |
| 30 | |
| 31 | d := &decoder{reader: r, remain: 4} |
| 32 | size := d.readInt32() |
| 33 | |
| 34 | if err = d.err; err != nil { |
| 35 | err = dontExpectEOF(err) |
| 36 | return |
| 37 | } |
| 38 | |
| 39 | d.remain = int(size) |
| 40 | correlationID = d.readInt32() |
| 41 | if err = d.err; err != nil { |
| 42 | if errors.Is(err, io.ErrUnexpectedEOF) { |
| 43 | // If a Writer/Reader is configured without TLS and connects |
| 44 | // to a broker expecting TLS the only message we return to the |
| 45 | // caller is io.ErrUnexpetedEOF which is opaque. This section |
| 46 | // tries to determine if that's what has happened. |
| 47 | // We first deconstruct the initial 4 bytes of the message |
| 48 | // from the size which was read earlier. |
| 49 | // Next, we examine those bytes to see if they looks like a TLS |
| 50 | // error message. If they do we wrap the io.ErrUnexpectedEOF |
| 51 | // with some context. |
| 52 | if looksLikeUnexpectedTLS(size) { |
| 53 | err = fmt.Errorf("%w: broker appears to be expecting TLS", io.ErrUnexpectedEOF) |
| 54 | } |
| 55 | return |
| 56 | } |
| 57 | err = dontExpectEOF(err) |
| 58 | return |
| 59 | } |
| 60 | |
| 61 | res := &t.responses[apiVersion-minVersion] |
| 62 | |
| 63 | if res.flexible { |
| 64 | // In the flexible case, there's a tag buffer at the end of the response header |
| 65 | taggedCount := int(d.readUnsignedVarInt()) |
| 66 | for i := 0; i < taggedCount; i++ { |
| 67 | d.readUnsignedVarInt() // tagID |
| 68 | size := d.readUnsignedVarInt() |