TestBrokerProducerShutdown ensures that a call to shutdown stops the brokerProducer run() loop and doesn't leak any goroutines nolint:paralleltest
(t *testing.T)
| 1713 | // |
| 1714 | //nolint:paralleltest |
| 1715 | func TestBrokerProducerShutdown(t *testing.T) { |
| 1716 | defer goleak.VerifyNone(t, |
| 1717 | goleak.IgnoreCurrent(), |
| 1718 | goleak.IgnoreTopFunction("github.com/rcrowley/go-metrics.(*meterArbiter).tick"), |
| 1719 | ) |
| 1720 | |
| 1721 | mockBroker := NewMockBroker(t, 1) |
| 1722 | metadataResponse := &MetadataResponse{} |
| 1723 | metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) |
| 1724 | metadataResponse.AddTopicPartition( |
| 1725 | "my_topic", 0, mockBroker.BrokerID(), nil, nil, nil, ErrNoError) |
| 1726 | mockBroker.Returns(metadataResponse) |
| 1727 | |
| 1728 | producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, NewTestConfig()) |
| 1729 | if err != nil { |
| 1730 | t.Fatal(err) |
| 1731 | } |
| 1732 | broker := &Broker{ |
| 1733 | addr: mockBroker.Addr(), |
| 1734 | id: mockBroker.BrokerID(), |
| 1735 | } |
| 1736 | // Starts various goroutines in newBrokerProducer |
| 1737 | bp := producer.(*asyncProducer).getBrokerProducer(broker) |
| 1738 | // Initiate the shutdown of all of them |
| 1739 | producer.(*asyncProducer).unrefBrokerProducer(broker, bp) |
| 1740 | |
| 1741 | _ = producer.Close() |
| 1742 | mockBroker.Close() |
| 1743 | } |
| 1744 | |
| 1745 | // TestBrokerProducerWaitForSpaceEmptyBufferRollover ensures forced rollovers with an empty buffer |
| 1746 | // do not deadlock waiting for responses when no partitions are muted. |
nothing calls this directly
no test coverage detected