| 333 | } |
| 334 | |
| 335 | func (ps *produceSet) readyToFlush() bool { |
| 336 | switch { |
| 337 | // If we don't have any messages, nothing else matters |
| 338 | case ps.empty(): |
| 339 | return false |
| 340 | // If all three config values are 0, we always flush as-fast-as-possible |
| 341 | case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0: |
| 342 | return true |
| 343 | // If we've passed the message trigger-point |
| 344 | case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages: |
| 345 | return true |
| 346 | // If we've passed the byte trigger-point |
| 347 | case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes: |
| 348 | return true |
| 349 | default: |
| 350 | return false |
| 351 | } |
| 352 | } |
| 353 | |
| 354 | func (ps *produceSet) empty() bool { |
| 355 | return ps.bufferCount == 0 |