MCPcopy
hub / github.com/IBM/sarama / newBrokerConsumer

Method newBrokerConsumer

consumer.go:1035–1050  ·  view source on GitHub ↗
(broker *Broker)

Source from the content-addressed store, hash-verified

1033}
1034
1035func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
1036 bc := &brokerConsumer{
1037 consumer: c,
1038 broker: broker,
1039 input: make(chan *brokerSubscription),
1040 newSubscriptions: make(chan []*brokerSubscription),
1041 subscriptions: make(map[*partitionConsumer]*brokerSubscription),
1042 refs: 0,
1043 stop: make(chan none),
1044 }
1045
1046 go withRecover(bc.subscriptionManager)
1047 go withRecover(bc.subscriptionConsumer)
1048
1049 return bc
1050}
1051
1052// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
1053// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks

Callers 1

refBrokerConsumerMethod · 0.95

Calls 1

withRecoverFunction · 0.85

Tested by

no test coverage detected