(pe packetEncoder)
| 111 | ) |
| 112 | |
| 113 | func (r *FetchRequest) encode(pe packetEncoder) (err error) { |
| 114 | metricRegistry := pe.metricRegistry() |
| 115 | |
| 116 | pe.putInt32(-1) // ReplicaID is always -1 for clients |
| 117 | pe.putInt32(r.MaxWaitTime) |
| 118 | pe.putInt32(r.MinBytes) |
| 119 | if r.Version >= 3 { |
| 120 | pe.putInt32(r.MaxBytes) |
| 121 | } |
| 122 | if r.Version >= 4 { |
| 123 | pe.putInt8(int8(r.Isolation)) |
| 124 | } |
| 125 | if r.Version >= 7 { |
| 126 | pe.putInt32(r.SessionID) |
| 127 | pe.putInt32(r.SessionEpoch) |
| 128 | } |
| 129 | err = pe.putArrayLength(len(r.blocks)) |
| 130 | if err != nil { |
| 131 | return err |
| 132 | } |
| 133 | for topic, blocks := range r.blocks { |
| 134 | err = pe.putString(topic) |
| 135 | if err != nil { |
| 136 | return err |
| 137 | } |
| 138 | err = pe.putArrayLength(len(blocks)) |
| 139 | if err != nil { |
| 140 | return err |
| 141 | } |
| 142 | for partition, block := range blocks { |
| 143 | pe.putInt32(partition) |
| 144 | err = block.encode(pe, r.Version) |
| 145 | if err != nil { |
| 146 | return err |
| 147 | } |
| 148 | } |
| 149 | pe.putEmptyTaggedFieldArray() |
| 150 | getOrRegisterTopicMeter("consumer-fetch-rate", topic, metricRegistry).Mark(1) |
| 151 | } |
| 152 | if r.Version >= 7 { |
| 153 | err = pe.putArrayLength(len(r.forgotten)) |
| 154 | if err != nil { |
| 155 | return err |
| 156 | } |
| 157 | for topic, partitions := range r.forgotten { |
| 158 | err = pe.putString(topic) |
| 159 | if err != nil { |
| 160 | return err |
| 161 | } |
| 162 | err = pe.putInt32Array(partitions) |
| 163 | if err != nil { |
| 164 | return err |
| 165 | } |
| 166 | pe.putEmptyTaggedFieldArray() |
| 167 | } |
| 168 | } |
| 169 | if r.Version >= 11 { |
| 170 | err = pe.putString(r.RackID) |
nothing calls this directly
no test coverage detected