TestPartitionConsumerDispatcherOrphanedClose verifies the dispatcher exits after AsyncClose when the brokerConsumer has detached this child from its subscriptions and there is a pending redispatch on the trigger channel.
(t *testing.T)
| 1725 | // after AsyncClose when the brokerConsumer has detached this child from its |
| 1726 | // subscriptions and there is a pending redispatch on the trigger channel. |
| 1727 | func TestPartitionConsumerDispatcherOrphanedClose(t *testing.T) { |
| 1728 | newOrphan := func() *partitionConsumer { |
| 1729 | config := NewTestConfig() |
| 1730 | config.Consumer.Retry.Backoff = time.Hour |
| 1731 | config.Consumer.MaxWaitTime = 50 * time.Millisecond |
| 1732 | |
| 1733 | c := &consumer{ |
| 1734 | conf: config, |
| 1735 | children: make(map[string]map[int32]*partitionConsumer), |
| 1736 | brokerConsumers: make(map[*Broker]*brokerConsumer), |
| 1737 | } |
| 1738 | bc := &brokerConsumer{consumer: c, input: make(chan *brokerSubscription), refs: 1, stop: make(chan none)} |
| 1739 | child := &partitionConsumer{ |
| 1740 | consumer: c, |
| 1741 | conf: config, |
| 1742 | broker: bc, |
| 1743 | feeder: make(chan *partitionConsumerResponse, 1), |
| 1744 | trigger: make(chan none, 1), |
| 1745 | dying: make(chan none), |
| 1746 | dispatcherStop: make(chan none), |
| 1747 | } |
| 1748 | child.brokerSubscription = newBrokerSubscription(child) |
| 1749 | return child |
| 1750 | } |
| 1751 | |
| 1752 | runDispatcher := func(t *testing.T, child *partitionConsumer) { |
| 1753 | t.Helper() |
| 1754 | done := make(chan none) |
| 1755 | go func() { |
| 1756 | defer close(done) |
| 1757 | child.dispatcher() |
| 1758 | }() |
| 1759 | select { |
| 1760 | case <-done: |
| 1761 | case <-time.After(2 * time.Second): |
| 1762 | require.FailNow(t, "dispatcher hung after AsyncClose on orphaned child") |
| 1763 | } |
| 1764 | } |
| 1765 | |
| 1766 | t.Run("subscription released before dying", func(t *testing.T) { |
| 1767 | child := newOrphan() |
| 1768 | child.brokerSubscription.release() |
| 1769 | child.triggerRedispatch() |
| 1770 | close(child.dying) |
| 1771 | runDispatcher(t, child) |
| 1772 | }) |
| 1773 | |
| 1774 | t.Run("subscription released after dying", func(t *testing.T) { |
| 1775 | child := newOrphan() |
| 1776 | child.triggerRedispatch() |
| 1777 | close(child.dying) |
| 1778 | child.brokerSubscription.release() |
| 1779 | runDispatcher(t, child) |
| 1780 | }) |
| 1781 | |
| 1782 | t.Run("stopDispatcher unblocks waitForBrokerHandover", func(t *testing.T) { |
| 1783 | child := newOrphan() |
| 1784 | child.triggerRedispatch() |
nothing calls this directly
no test coverage detected