| 311 | } |
| 312 | |
| 313 | func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { |
| 314 | version := 1 |
| 315 | if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { |
| 316 | version = 2 |
| 317 | } |
| 318 | |
| 319 | switch { |
| 320 | // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. |
| 321 | case ps.bufferBytes+msg.ByteSize(version) >= int(MaxRequestSize-(10*1024)): |
| 322 | return true |
| 323 | // Would we overflow the size-limit of a message-batch for this partition? |
| 324 | case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && |
| 325 | ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.ByteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes: |
| 326 | return true |
| 327 | // Would we overflow simply in number of messages? |
| 328 | case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: |
| 329 | return true |
| 330 | default: |
| 331 | return false |
| 332 | } |
| 333 | } |
| 334 | |
| 335 | func (ps *produceSet) readyToFlush() bool { |
| 336 | switch { |