MCPcopy
hub / github.com/IBM/sarama / retryHandler

Method retryHandler

async_producer.go:1574–1631  ·  view source on GitHub ↗

singleton effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock based on https://godoc.org/github.com/eapache/channels#InfiniteChannel

()

Source from the content-addressed store, hash-verified

1572// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
1573// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
1574func (p *asyncProducer) retryHandler() {
1575 maxBufferLength := p.conf.Producer.Retry.MaxBufferLength
1576 if 0 < maxBufferLength && maxBufferLength < minFunctionalRetryBufferLength {
1577 maxBufferLength = minFunctionalRetryBufferLength
1578 }
1579
1580 maxBufferBytes := p.conf.Producer.Retry.MaxBufferBytes
1581 if 0 < maxBufferBytes && maxBufferBytes < minFunctionalRetryBufferBytes {
1582 maxBufferBytes = minFunctionalRetryBufferBytes
1583 }
1584
1585 version := 1
1586 if p.conf.Version.IsAtLeast(V0_11_0_0) {
1587 version = 2
1588 }
1589
1590 var currentByteSize int64
1591 var msg *ProducerMessage
1592 var buf queue.Queue[*ProducerMessage]
1593
1594 for {
1595 if buf.Length() == 0 {
1596 msg = <-p.retries
1597 } else {
1598 select {
1599 case msg = <-p.retries:
1600 case p.input <- buf.Peek():
1601 msgToRemove := buf.Remove()
1602 currentByteSize -= int64(msgToRemove.ByteSize(version))
1603 continue
1604 }
1605 }
1606
1607 if msg == nil {
1608 return
1609 }
1610
1611 buf.Add(msg)
1612 currentByteSize += int64(msg.ByteSize(version))
1613
1614 if (maxBufferLength <= 0 || buf.Length() < maxBufferLength) && (maxBufferBytes <= 0 || currentByteSize < maxBufferBytes) {
1615 continue
1616 }
1617
1618 msgToHandle := buf.Peek()
1619 if msgToHandle.flags == 0 {
1620 select {
1621 case p.input <- msgToHandle:
1622 buf.Remove()
1623 currentByteSize -= int64(msgToHandle.ByteSize(version))
1624 default:
1625 buf.Remove()
1626 currentByteSize -= int64(msgToHandle.ByteSize(version))
1627 p.returnError(msgToHandle, ErrProducerRetryBufferOverflow)
1628 }
1629 }
1630 }
1631}

Callers

nothing calls this directly

Calls 7

ByteSizeMethod · 0.95
returnErrorMethod · 0.95
IsAtLeastMethod · 0.80
PeekMethod · 0.80
RemoveMethod · 0.80
LengthMethod · 0.65
AddMethod · 0.45

Tested by

no test coverage detected