(broker *Broker)
| 1033 | } |
| 1034 | |
| 1035 | func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer { |
| 1036 | bc := &brokerConsumer{ |
| 1037 | consumer: c, |
| 1038 | broker: broker, |
| 1039 | input: make(chan *brokerSubscription), |
| 1040 | newSubscriptions: make(chan []*brokerSubscription), |
| 1041 | subscriptions: make(map[*partitionConsumer]*brokerSubscription), |
| 1042 | refs: 0, |
| 1043 | stop: make(chan none), |
| 1044 | } |
| 1045 | |
| 1046 | go withRecover(bc.subscriptionManager) |
| 1047 | go withRecover(bc.subscriptionConsumer) |
| 1048 | |
| 1049 | return bc |
| 1050 | } |
| 1051 | |
| 1052 | // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer |
| 1053 | // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks |
no test coverage detected