MCPcopy
hub / github.com/IBM/sarama / TestBrokerProducerShutdown

Function TestBrokerProducerShutdown

async_producer_test.go:1715–1743  ·  view source on GitHub ↗

TestBrokerProducerShutdown ensures that a call to shutdown stops the brokerProducer run() loop and doesn't leak any goroutines nolint:paralleltest

(t *testing.T)

Source from the content-addressed store, hash-verified

1713//
1714//nolint:paralleltest
1715func TestBrokerProducerShutdown(t *testing.T) {
1716 defer goleak.VerifyNone(t,
1717 goleak.IgnoreCurrent(),
1718 goleak.IgnoreTopFunction("github.com/rcrowley/go-metrics.(*meterArbiter).tick"),
1719 )
1720
1721 mockBroker := NewMockBroker(t, 1)
1722 metadataResponse := &MetadataResponse{}
1723 metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID())
1724 metadataResponse.AddTopicPartition(
1725 "my_topic", 0, mockBroker.BrokerID(), nil, nil, nil, ErrNoError)
1726 mockBroker.Returns(metadataResponse)
1727
1728 producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, NewTestConfig())
1729 if err != nil {
1730 t.Fatal(err)
1731 }
1732 broker := &Broker{
1733 addr: mockBroker.Addr(),
1734 id: mockBroker.BrokerID(),
1735 }
1736 // Starts various goroutines in newBrokerProducer
1737 bp := producer.(*asyncProducer).getBrokerProducer(broker)
1738 // Initiate the shutdown of all of them
1739 producer.(*asyncProducer).unrefBrokerProducer(broker, bp)
1740
1741 _ = producer.Close()
1742 mockBroker.Close()
1743}
1744
1745// TestBrokerProducerWaitForSpaceEmptyBufferRollover ensures forced rollovers with an empty buffer
1746// do not deadlock waiting for responses when no partitions are muted.

Callers

nothing calls this directly

Calls 13

AddBrokerMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
AddTopicPartitionMethod · 0.95
ReturnsMethod · 0.95
CloseMethod · 0.95
CloseMethod · 0.95
NewMockBrokerFunction · 0.85
FatalMethod · 0.80
getBrokerProducerMethod · 0.80
unrefBrokerProducerMethod · 0.80
NewAsyncProducerFunction · 0.70

Tested by

no test coverage detected