| 6 | ) |
| 7 | |
| 8 | func ReadRequest(r io.Reader) (apiVersion int16, correlationID int32, clientID string, msg Message, err error) { |
| 9 | d := &decoder{reader: r, remain: 4} |
| 10 | size := d.readInt32() |
| 11 | |
| 12 | if err = d.err; err != nil { |
| 13 | err = dontExpectEOF(err) |
| 14 | return |
| 15 | } |
| 16 | |
| 17 | d.remain = int(size) |
| 18 | apiKey := ApiKey(d.readInt16()) |
| 19 | apiVersion = d.readInt16() |
| 20 | correlationID = d.readInt32() |
| 21 | clientID = d.readString() |
| 22 | |
| 23 | if i := int(apiKey); i < 0 || i >= len(apiTypes) { |
| 24 | err = fmt.Errorf("unsupported api key: %d", i) |
| 25 | return |
| 26 | } |
| 27 | |
| 28 | if err = d.err; err != nil { |
| 29 | err = dontExpectEOF(err) |
| 30 | return |
| 31 | } |
| 32 | |
| 33 | t := &apiTypes[apiKey] |
| 34 | if t == nil { |
| 35 | err = fmt.Errorf("unsupported api: %s", apiNames[apiKey]) |
| 36 | return |
| 37 | } |
| 38 | |
| 39 | minVersion := t.minVersion() |
| 40 | maxVersion := t.maxVersion() |
| 41 | |
| 42 | if apiVersion < minVersion || apiVersion > maxVersion { |
| 43 | err = fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion) |
| 44 | return |
| 45 | } |
| 46 | |
| 47 | req := &t.requests[apiVersion-minVersion] |
| 48 | |
| 49 | if req.flexible { |
| 50 | // In the flexible case, there's a tag buffer at the end of the request header |
| 51 | taggedCount := int(d.readUnsignedVarInt()) |
| 52 | for i := 0; i < taggedCount; i++ { |
| 53 | d.readUnsignedVarInt() // tagID |
| 54 | size := d.readUnsignedVarInt() |
| 55 | |
| 56 | // Just throw away the values for now |
| 57 | d.read(int(size)) |
| 58 | } |
| 59 | } |
| 60 | |
| 61 | msg = req.new() |
| 62 | req.decode(d, valueOf(msg)) |
| 63 | d.discardAll() |
| 64 | |
| 65 | if err = d.err; err != nil { |