singleton effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
()
| 1572 | // effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock |
| 1573 | // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel |
| 1574 | func (p *asyncProducer) retryHandler() { |
| 1575 | maxBufferLength := p.conf.Producer.Retry.MaxBufferLength |
| 1576 | if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength { |
| 1577 | maxBufferLength = minFunctionalRetryBufferLength |
| 1578 | } |
| 1579 | |
| 1580 | maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes |
| 1581 | if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes { |
| 1582 | maxBufferBytes = minFunctionalRetryBufferBytes |
| 1583 | } |
| 1584 | |
| 1585 | version := 1 |
| 1586 | if p.conf.Version.IsAtLeast(V0_11_0_0) { |
| 1587 | version = 2 |
| 1588 | } |
| 1589 | |
| 1590 | var currentByteSize int64 |
| 1591 | var msg *ProducerMessage |
| 1592 | var buf queue.Queue[*ProducerMessage] |
| 1593 | |
| 1594 | for { |
| 1595 | if buf.Length() == 0 { |
| 1596 | msg = <-p.retries |
| 1597 | } else { |
| 1598 | select { |
| 1599 | case msg = <-p.retries: |
| 1600 | case p.input <- buf.Peek(): |
| 1601 | msgToRemove := buf.Remove() |
| 1602 | currentByteSize -= int64(msgToRemove.ByteSize(version)) |
| 1603 | continue |
| 1604 | } |
| 1605 | } |
| 1606 | |
| 1607 | if msg == nil { |
| 1608 | return |
| 1609 | } |
| 1610 | |
| 1611 | buf.Add(msg) |
| 1612 | currentByteSize += int64(msg.ByteSize(version)) |
| 1613 | |
| 1614 | if (maxBufferLength <= 0 || buf.Length() < maxBufferLength) && (maxBufferBytes <= 0 || currentByteSize < maxBufferBytes) { |
| 1615 | continue |
| 1616 | } |
| 1617 | |
| 1618 | msgToHandle := buf.Peek() |
| 1619 | if msgToHandle.flags == 0 { |
| 1620 | select { |
| 1621 | case p.input <- msgToHandle: |
| 1622 | buf.Remove() |
| 1623 | currentByteSize -= int64(msgToHandle.ByteSize(version)) |
| 1624 | default: |
| 1625 | buf.Remove() |
| 1626 | currentByteSize -= int64(msgToHandle.ByteSize(version)) |
| 1627 | p.returnError(msgToHandle, ErrProducerRetryBufferOverflow) |
| 1628 | } |
| 1629 | } |
| 1630 | } |
| 1631 | } |