MCPcopy
hub / github.com/IBM/sarama / TestConsumerAbortNoGoroutineLeak

Function TestConsumerAbortNoGoroutineLeak

consumer_test.go:2377–2583  ·  view source on GitHub ↗

TestConsumerAbortNoGoroutineLeak verifies that brokerConsumer.abort() does not leak the subscriptionManager goroutine when children are already shutting down or already queued for redispatch.

(t *testing.T)

Source from the content-addressed store, hash-verified

2375// not leak the subscriptionManager goroutine when children are already
2376// shutting down or already queued for redispatch.
2377func TestConsumerAbortNoGoroutineLeak(t *testing.T) {
2378 metrics.UseNilMetrics = true
2379 defer func() { metrics.UseNilMetrics = false }()
2380
2381 defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
2382
2383 config := NewTestConfig()
2384 config.Consumer.Return.Errors = true
2385
2386 broker0 := NewMockBroker(t, 0)
2387 defer broker0.Close()
2388
2389 broker0.SetHandlerByMap(map[string]MockResponse{
2390 "MetadataRequest": NewMockMetadataResponse(t).
2391 SetBroker(broker0.Addr(), broker0.BrokerID()).
2392 SetLeader("my_topic", 0, broker0.BrokerID()),
2393 "OffsetRequest": NewMockOffsetResponse(t).
2394 SetOffset("my_topic", 0, OffsetOldest, 0).
2395 SetOffset("my_topic", 0, OffsetNewest, 1000),
2396 })
2397
2398 client, err := NewClient([]string{broker0.Addr()}, config)
2399 require.NoError(t, err)
2400 defer client.Close()
2401
2402 realBroker := client.Brokers()[0]
2403
2404 newChild := func(errorsBuffer int) *partitionConsumer {
2405 return &partitionConsumer{
2406 conf: config,
2407 topic: "my_topic",
2408 partition: 0,
2409 trigger: make(chan none, 1),
2410 dying: make(chan none),
2411 dispatcherStop: make(chan none),
2412 messages: make(chan *ConsumerMessage, config.ChannelBufferSize),
2413 errors: make(chan *ConsumerError, errorsBuffer),
2414 feeder: make(chan *partitionConsumerResponse, 1),
2415 }
2416 }
2417
2418 newBrokerConsumer := func(child *partitionConsumer) *brokerConsumer {
2419 c := &consumer{
2420 client: client,
2421 conf: config,
2422 children: make(map[string]map[int32]*partitionConsumer),
2423 brokerConsumers: make(map[*Broker]*brokerConsumer),
2424 metricRegistry: newCleanupRegistry(config.MetricRegistry),
2425 }
2426 child.consumer = c
2427 subscription := newBrokerSubscription(child)
2428
2429 bc := &brokerConsumer{
2430 consumer: c,
2431 broker: realBroker,
2432 input: make(chan *brokerSubscription),
2433 newSubscriptions: make(chan []*brokerSubscription),
2434 subscriptions: map[*partitionConsumer]*brokerSubscription{child: subscription},

Callers

nothing calls this directly

Calls 15

CloseMethod · 0.95
SetHandlerByMapMethod · 0.95
AddrMethod · 0.95
BrokerIDMethod · 0.95
CloseMethod · 0.95
BrokersMethod · 0.95
abortMethod · 0.95
queueSubscriptionMethod · 0.95
NewMockBrokerFunction · 0.85
NewMockMetadataResponseFunction · 0.85
NewMockOffsetResponseFunction · 0.85
newCleanupRegistryFunction · 0.85

Tested by

no test coverage detected