| 84 | } |
| 85 | |
| 86 | func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error { |
| 87 | apiKey := msg.ApiKey() |
| 88 | |
| 89 | if i := int(apiKey); i < 0 || i >= len(apiTypes) { |
| 90 | return fmt.Errorf("unsupported api key: %d", i) |
| 91 | } |
| 92 | |
| 93 | t := &apiTypes[apiKey] |
| 94 | if t == nil { |
| 95 | return fmt.Errorf("unsupported api: %s", apiNames[apiKey]) |
| 96 | } |
| 97 | |
| 98 | if typedMessage, ok := msg.(OverrideTypeMessage); ok { |
| 99 | typeKey := typedMessage.TypeKey() |
| 100 | overrideType := overrideApiTypes[apiKey][typeKey] |
| 101 | t = &overrideType |
| 102 | } |
| 103 | |
| 104 | minVersion := t.minVersion() |
| 105 | maxVersion := t.maxVersion() |
| 106 | |
| 107 | if apiVersion < minVersion || apiVersion > maxVersion { |
| 108 | return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion) |
| 109 | } |
| 110 | |
| 111 | r := &t.responses[apiVersion-minVersion] |
| 112 | v := valueOf(msg) |
| 113 | b := newPageBuffer() |
| 114 | defer b.unref() |
| 115 | |
| 116 | e := &encoder{writer: b} |
| 117 | e.writeInt32(0) // placeholder for the response size |
| 118 | e.writeInt32(correlationID) |
| 119 | if r.flexible { |
| 120 | // Flexible messages use extra space for a tag buffer, |
| 121 | // which begins with a size value. Since we're not writing any fields into the |
| 122 | // latter, we can just write zero for now. |
| 123 | // |
| 124 | // See |
| 125 | // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields |
| 126 | // for details. |
| 127 | e.writeUnsignedVarInt(0) |
| 128 | } |
| 129 | r.encode(e, v) |
| 130 | err := e.err |
| 131 | |
| 132 | if err == nil { |
| 133 | size := packUint32(uint32(b.Size()) - 4) |
| 134 | b.WriteAt(size[:], 0) |
| 135 | _, err = b.WriteTo(w) |
| 136 | } |
| 137 | |
| 138 | return err |
| 139 | } |
| 140 | |
| 141 | const ( |
| 142 | tlsAlertByte byte = 0x15 |