Close implements ConsumerGroup.
()
| 171 | |
| 172 | // Close implements ConsumerGroup. |
| 173 | func (c *consumerGroup) Close() (err error) { |
| 174 | c.closeOnce.Do(func() { |
| 175 | close(c.closed) |
| 176 | |
| 177 | // leave group |
| 178 | if e := c.leave(); e != nil { |
| 179 | err = e |
| 180 | } |
| 181 | |
| 182 | go func() { |
| 183 | c.errorsLock.Lock() |
| 184 | defer c.errorsLock.Unlock() |
| 185 | close(c.errors) |
| 186 | }() |
| 187 | |
| 188 | // drain errors |
| 189 | for e := range c.errors { |
| 190 | err = e |
| 191 | } |
| 192 | |
| 193 | if e := c.client.Close(); e != nil { |
| 194 | err = e |
| 195 | } |
| 196 | |
| 197 | c.metricRegistry.UnregisterAll() |
| 198 | }) |
| 199 | return |
| 200 | } |
| 201 | |
| 202 | // Consume implements ConsumerGroup. |
| 203 | func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error { |
nothing calls this directly
no test coverage detected