| 155 | } |
| 156 | |
| 157 | func (g *Generator) stopping(_ error) error { |
| 158 | g.stopIncomingRequests() |
| 159 | |
| 160 | // Stop reading from queue and wait for outstanding data to be processed and committed. |
| 161 | if g.cfg.ConsumeFromKafka { |
| 162 | g.stopKafka() |
| 163 | } |
| 164 | |
| 165 | var wg sync.WaitGroup |
| 166 | wg.Add(len(g.instances)) |
| 167 | |
| 168 | for _, inst := range g.instances { |
| 169 | go func(inst *instance) { |
| 170 | inst.shutdown() |
| 171 | wg.Done() |
| 172 | }(inst) |
| 173 | } |
| 174 | |
| 175 | wg.Wait() |
| 176 | |
| 177 | return nil |
| 178 | } |
| 179 | |
| 180 | // stopIncomingRequests marks the generator as read-only, refusing push requests |
| 181 | func (g *Generator) stopIncomingRequests() { |