| 70 | } |
| 71 | |
| 72 | func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error { |
| 73 | apiKey := msg.ApiKey() |
| 74 | |
| 75 | if i := int(apiKey); i < 0 || i >= len(apiTypes) { |
| 76 | return fmt.Errorf("unsupported api key: %d", i) |
| 77 | } |
| 78 | |
| 79 | t := &apiTypes[apiKey] |
| 80 | if t == nil { |
| 81 | return fmt.Errorf("unsupported api: %s", apiNames[apiKey]) |
| 82 | } |
| 83 | |
| 84 | if typedMessage, ok := msg.(OverrideTypeMessage); ok { |
| 85 | typeKey := typedMessage.TypeKey() |
| 86 | overrideType := overrideApiTypes[apiKey][typeKey] |
| 87 | t = &overrideType |
| 88 | } |
| 89 | |
| 90 | minVersion := t.minVersion() |
| 91 | maxVersion := t.maxVersion() |
| 92 | |
| 93 | if apiVersion < minVersion || apiVersion > maxVersion { |
| 94 | return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion) |
| 95 | } |
| 96 | |
| 97 | r := &t.requests[apiVersion-minVersion] |
| 98 | v := valueOf(msg) |
| 99 | b := newPageBuffer() |
| 100 | defer b.unref() |
| 101 | |
| 102 | e := &encoder{writer: b} |
| 103 | e.writeInt32(0) // placeholder for the request size |
| 104 | e.writeInt16(int16(apiKey)) |
| 105 | e.writeInt16(apiVersion) |
| 106 | e.writeInt32(correlationID) |
| 107 | |
| 108 | if r.flexible { |
| 109 | // Flexible messages use a nullable string for the client ID, then extra space for a |
| 110 | // tag buffer, which begins with a size value. Since we're not writing any fields into the |
| 111 | // latter, we can just write zero for now. |
| 112 | // |
| 113 | // See |
| 114 | // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields |
| 115 | // for details. |
| 116 | e.writeNullString(clientID) |
| 117 | e.writeUnsignedVarInt(0) |
| 118 | } else { |
| 119 | // Technically, recent versions of kafka interpret this field as a nullable |
| 120 | // string, however kafka 0.10 expected a non-nullable string and fails with |
| 121 | // a NullPointerException when it receives a null client id. |
| 122 | e.writeString(clientID) |
| 123 | } |
| 124 | r.encode(e, v) |
| 125 | err := e.err |
| 126 | |
| 127 | if err == nil { |
| 128 | size := packUint32(uint32(b.Size()) - 4) |
| 129 | b.WriteAt(size[:], 0) |