TestConsumerAbortNoGoroutineLeak verifies that brokerConsumer.abort() does not leak the subscriptionManager goroutine when children are already shutting down or already queued for redispatch.
(t *testing.T)
| 2375 | // not leak the subscriptionManager goroutine when children are already |
| 2376 | // shutting down or already queued for redispatch. |
| 2377 | func TestConsumerAbortNoGoroutineLeak(t *testing.T) { |
| 2378 | metrics.UseNilMetrics = true |
| 2379 | defer func() { metrics.UseNilMetrics = false }() |
| 2380 | |
| 2381 | defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) |
| 2382 | |
| 2383 | config := NewTestConfig() |
| 2384 | config.Consumer.Return.Errors = true |
| 2385 | |
| 2386 | broker0 := NewMockBroker(t, 0) |
| 2387 | defer broker0.Close() |
| 2388 | |
| 2389 | broker0.SetHandlerByMap(map[string]MockResponse{ |
| 2390 | "MetadataRequest": NewMockMetadataResponse(t). |
| 2391 | SetBroker(broker0.Addr(), broker0.BrokerID()). |
| 2392 | SetLeader("my_topic", 0, broker0.BrokerID()), |
| 2393 | "OffsetRequest": NewMockOffsetResponse(t). |
| 2394 | SetOffset("my_topic", 0, OffsetOldest, 0). |
| 2395 | SetOffset("my_topic", 0, OffsetNewest, 1000), |
| 2396 | }) |
| 2397 | |
| 2398 | client, err := NewClient([]string{broker0.Addr()}, config) |
| 2399 | require.NoError(t, err) |
| 2400 | defer client.Close() |
| 2401 | |
| 2402 | realBroker := client.Brokers()[0] |
| 2403 | |
| 2404 | newChild := func(errorsBuffer int) *partitionConsumer { |
| 2405 | return &partitionConsumer{ |
| 2406 | conf: config, |
| 2407 | topic: "my_topic", |
| 2408 | partition: 0, |
| 2409 | trigger: make(chan none, 1), |
| 2410 | dying: make(chan none), |
| 2411 | dispatcherStop: make(chan none), |
| 2412 | messages: make(chan *ConsumerMessage, config.ChannelBufferSize), |
| 2413 | errors: make(chan *ConsumerError, errorsBuffer), |
| 2414 | feeder: make(chan *partitionConsumerResponse, 1), |
| 2415 | } |
| 2416 | } |
| 2417 | |
| 2418 | newBrokerConsumer := func(child *partitionConsumer) *brokerConsumer { |
| 2419 | c := &consumer{ |
| 2420 | client: client, |
| 2421 | conf: config, |
| 2422 | children: make(map[string]map[int32]*partitionConsumer), |
| 2423 | brokerConsumers: make(map[*Broker]*brokerConsumer), |
| 2424 | metricRegistry: newCleanupRegistry(config.MetricRegistry), |
| 2425 | } |
| 2426 | child.consumer = c |
| 2427 | subscription := newBrokerSubscription(child) |
| 2428 | |
| 2429 | bc := &brokerConsumer{ |
| 2430 | consumer: c, |
| 2431 | broker: realBroker, |
| 2432 | input: make(chan *brokerSubscription), |
| 2433 | newSubscriptions: make(chan []*brokerSubscription), |
| 2434 | subscriptions: map[*partitionConsumer]*brokerSubscription{child: subscription}, |
nothing calls this directly
no test coverage detected