(sent *produceSet, err error)
| 1509 | } |
| 1510 | |
| 1511 | func (bp *brokerProducer) handleError(sent *produceSet, err error) { |
| 1512 | var target PacketEncodingError |
| 1513 | if errors.As(err, &target) { |
| 1514 | sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { |
| 1515 | bp.parent.returnErrors(pSet.msgs, err) |
| 1516 | }) |
| 1517 | bp.parent.muter.unmute(sent) |
| 1518 | } else { |
| 1519 | Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err) |
| 1520 | bp.parent.abandonBrokerConnection(bp.broker) |
| 1521 | _ = bp.broker.Close() |
| 1522 | bp.closing = err |
| 1523 | var retryTopics []string |
| 1524 | retryTopicSeen := make(map[string]struct{}) |
| 1525 | sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { |
| 1526 | if _, ok := retryTopicSeen[topic]; ok { |
| 1527 | return |
| 1528 | } |
| 1529 | retryTopicSeen[topic] = struct{}{} |
| 1530 | retryTopics = append(retryTopics, topic) |
| 1531 | }) |
| 1532 | if bp.parent.conf.Producer.Idempotent && len(retryTopics) > 0 { |
| 1533 | refreshErr := bp.parent.client.RefreshMetadata(retryTopics...) |
| 1534 | if refreshErr != nil { |
| 1535 | Logger.Printf("Failed refreshing metadata because of %v\n", refreshErr) |
| 1536 | } |
| 1537 | } |
| 1538 | keepMuted := make(map[string]map[int32]struct{}) |
| 1539 | sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { |
| 1540 | // keep partition marked as in-flight during retry (connection error) |
| 1541 | if bp.currentRetries[topic] == nil { |
| 1542 | bp.currentRetries[topic] = make(map[int32]error) |
| 1543 | } |
| 1544 | bp.currentRetries[topic][partition] = err |
| 1545 | if bp.parent.conf.Producer.Idempotent { |
| 1546 | if keepMuted[topic] == nil { |
| 1547 | keepMuted[topic] = make(map[int32]struct{}) |
| 1548 | } |
| 1549 | keepMuted[topic][partition] = struct{}{} |
| 1550 | go bp.parent.retryBatch(topic, partition, pSet, err, true) |
| 1551 | } else { |
| 1552 | bp.parent.retryMessages(pSet.msgs, err) |
| 1553 | } |
| 1554 | }) |
| 1555 | bp.accumulatingBatch.eachPartition(func(topic string, partition int32, pSet *partitionSet) { |
| 1556 | bp.parent.retryMessages(pSet.msgs, err) |
| 1557 | }) |
| 1558 | bp.rollOver() |
| 1559 | |
| 1560 | unmuteSet := sent.copyFunc(func(topic string, partition int32) bool { |
| 1561 | if partitions := keepMuted[topic]; partitions != nil { |
| 1562 | _, kept := partitions[partition] |
| 1563 | return !kept |
| 1564 | } |
| 1565 | return true |
| 1566 | }) |
| 1567 | bp.parent.muter.unmute(unmuteSet) |
| 1568 | } |
no test coverage detected