MCPcopy
hub / github.com/IBM/sarama / TestPartitionConsumerDispatcherOrphanedClose

Function TestPartitionConsumerDispatcherOrphanedClose

consumer_test.go:1727–1789  ·  view source on GitHub ↗

TestPartitionConsumerDispatcherOrphanedClose verifies the dispatcher exits after AsyncClose when the brokerConsumer has detached this child from its subscriptions and there is a pending redispatch on the trigger channel.

(t *testing.T)

Source from the content-addressed store, hash-verified

1725// after AsyncClose when the brokerConsumer has detached this child from its
1726// subscriptions and there is a pending redispatch on the trigger channel.
1727func TestPartitionConsumerDispatcherOrphanedClose(t *testing.T) {
1728 newOrphan := func() *partitionConsumer {
1729 config := NewTestConfig()
1730 config.Consumer.Retry.Backoff = time.Hour
1731 config.Consumer.MaxWaitTime = 50 * time.Millisecond
1732
1733 c := &consumer{
1734 conf: config,
1735 children: make(map[string]map[int32]*partitionConsumer),
1736 brokerConsumers: make(map[*Broker]*brokerConsumer),
1737 }
1738 bc := &brokerConsumer{consumer: c, input: make(chan *brokerSubscription), refs: 1, stop: make(chan none)}
1739 child := &partitionConsumer{
1740 consumer: c,
1741 conf: config,
1742 broker: bc,
1743 feeder: make(chan *partitionConsumerResponse, 1),
1744 trigger: make(chan none, 1),
1745 dying: make(chan none),
1746 dispatcherStop: make(chan none),
1747 }
1748 child.brokerSubscription = newBrokerSubscription(child)
1749 return child
1750 }
1751
1752 runDispatcher := func(t *testing.T, child *partitionConsumer) {
1753 t.Helper()
1754 done := make(chan none)
1755 go func() {
1756 defer close(done)
1757 child.dispatcher()
1758 }()
1759 select {
1760 case <-done:
1761 case <-time.After(2 * time.Second):
1762 require.FailNow(t, "dispatcher hung after AsyncClose on orphaned child")
1763 }
1764 }
1765
1766 t.Run("subscription released before dying", func(t *testing.T) {
1767 child := newOrphan()
1768 child.brokerSubscription.release()
1769 child.triggerRedispatch()
1770 close(child.dying)
1771 runDispatcher(t, child)
1772 })
1773
1774 t.Run("subscription released after dying", func(t *testing.T) {
1775 child := newOrphan()
1776 child.triggerRedispatch()
1777 close(child.dying)
1778 child.brokerSubscription.release()
1779 runDispatcher(t, child)
1780 })
1781
1782 t.Run("stopDispatcher unblocks waitForBrokerHandover", func(t *testing.T) {
1783 child := newOrphan()
1784 child.triggerRedispatch()

Callers

nothing calls this directly

Calls 8

dispatcherMethod · 0.95
triggerRedispatchMethod · 0.95
stopDispatcherMethod · 0.95
newBrokerSubscriptionFunction · 0.85
HelperMethod · 0.80
RunMethod · 0.80
NewTestConfigFunction · 0.70
releaseMethod · 0.45

Tested by

no test coverage detected