MCPcopy
hub / github.com/IBM/sarama / handleError

Method handleError

async_producer.go:1511–1569  ·  view source on GitHub ↗
(sent *produceSet, err error)

Source from the content-addressed store, hash-verified

1509}
1510
1511func (bp *brokerProducer) handleError(sent *produceSet, err error) {
1512 var target PacketEncodingError
1513 if errors.As(err, &target) {
1514 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1515 bp.parent.returnErrors(pSet.msgs, err)
1516 })
1517 bp.parent.muter.unmute(sent)
1518 } else {
1519 Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
1520 bp.parent.abandonBrokerConnection(bp.broker)
1521 _ = bp.broker.Close()
1522 bp.closing = err
1523 var retryTopics []string
1524 retryTopicSeen := make(map[string]struct{})
1525 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1526 if _, ok := retryTopicSeen[topic]; ok {
1527 return
1528 }
1529 retryTopicSeen[topic] = struct{}{}
1530 retryTopics = append(retryTopics, topic)
1531 })
1532 if bp.parent.conf.Producer.Idempotent && len(retryTopics) > 0 {
1533 refreshErr := bp.parent.client.RefreshMetadata(retryTopics...)
1534 if refreshErr != nil {
1535 Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr)
1536 }
1537 }
1538 keepMuted := make(map[string]map[int32]struct{})
1539 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1540 // keep partition marked as in-flight during retry (connection error)
1541 if bp.currentRetries[topic] == nil {
1542 bp.currentRetries[topic] = make(map[int32]error)
1543 }
1544 bp.currentRetries[topic][partition] = err
1545 if bp.parent.conf.Producer.Idempotent {
1546 if keepMuted[topic] == nil {
1547 keepMuted[topic] = make(map[int32]struct{})
1548 }
1549 keepMuted[topic][partition] = struct{}{}
1550 go bp.parent.retryBatch(topic, partition, pSet, err, true)
1551 } else {
1552 bp.parent.retryMessages(pSet.msgs, err)
1553 }
1554 })
1555 bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1556 bp.parent.retryMessages(pSet.msgs, err)
1557 })
1558 bp.rollOver()
1559
1560 unmuteSet := sent.copyFunc(func(topic string, partition int32) bool {
1561 if partitions := keepMuted[topic]; partitions != nil {
1562 _, kept := partitions[partition]
1563 return !kept
1564 }
1565 return true
1566 })
1567 bp.parent.muter.unmute(unmuteSet)
1568 }

Callers 1

handleResponseMethod · 0.95

Calls 12

rollOverMethod · 0.95
eachPartitionMethod · 0.80
returnErrorsMethod · 0.80
unmuteMethod · 0.80
IDMethod · 0.80
retryBatchMethod · 0.80
retryMessagesMethod · 0.80
copyFuncMethod · 0.80
PrintfMethod · 0.65
CloseMethod · 0.65
RefreshMetadataMethod · 0.65

Tested by

no test coverage detected