MCPcopy
hub / github.com/segmentio/kafka-go / WriteRequest

Function WriteRequest

protocol/request.go:72–134  ·  view source on GitHub ↗
(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message)

Source from the content-addressed store, hash-verified

70}
71
72func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error {
73 apiKey := msg.ApiKey()
74
75 if i := int(apiKey); i < 0 || i >= len(apiTypes) {
76 return fmt.Errorf("unsupported api key: %d", i)
77 }
78
79 t := &apiTypes[apiKey]
80 if t == nil {
81 return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
82 }
83
84 if typedMessage, ok := msg.(OverrideTypeMessage); ok {
85 typeKey := typedMessage.TypeKey()
86 overrideType := overrideApiTypes[apiKey][typeKey]
87 t = &overrideType
88 }
89
90 minVersion := t.minVersion()
91 maxVersion := t.maxVersion()
92
93 if apiVersion < minVersion || apiVersion > maxVersion {
94 return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
95 }
96
97 r := &t.requests[apiVersion-minVersion]
98 v := valueOf(msg)
99 b := newPageBuffer()
100 defer b.unref()
101
102 e := &encoder{writer: b}
103 e.writeInt32(0) // placeholder for the request size
104 e.writeInt16(int16(apiKey))
105 e.writeInt16(apiVersion)
106 e.writeInt32(correlationID)
107
108 if r.flexible {
109 // Flexible messages use a nullable string for the client ID, then extra space for a
110 // tag buffer, which begins with a size value. Since we're not writing any fields into the
111 // latter, we can just write zero for now.
112 //
113 // See
114 // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
115 // for details.
116 e.writeNullString(clientID)
117 e.writeUnsignedVarInt(0)
118 } else {
119 // Technically, recent versions of kafka interpret this field as a nullable
120 // string, however kafka 0.10 expected a non-nullable string and fails with
121 // a NullPointerException when it receives a null client id.
122 e.writeString(clientID)
123 }
124 r.encode(e, v)
125 err := e.err
126
127 if err == nil {
128 size := packUint32(uint32(b.Size()) - 4)
129 b.WriteAt(size[:], 0)

Callers 4

TestRequestFunction · 0.92
TestRequestWithOverrideFunction · 0.92
BenchmarkRequestFunction · 0.92
RoundTripFunction · 0.85

Calls 15

writeInt32Method · 0.95
writeInt16Method · 0.95
writeNullStringMethod · 0.95
writeUnsignedVarIntMethod · 0.95
writeStringMethod · 0.95
newPageBufferFunction · 0.85
packUint32Function · 0.85
minVersionMethod · 0.80
maxVersionMethod · 0.80
valueOfFunction · 0.70
ApiKeyMethod · 0.65
TypeKeyMethod · 0.65

Tested by 2

TestRequestFunction · 0.74
TestRequestWithOverrideFunction · 0.74