Function
subscribe
(ctx context.Context, rdb *redis.Client, topic string, subscriberId int, wg *sync.WaitGroup)
Source from the content-addressed store, hash-verified
| 147 | } |
| 148 | |
| 149 | func subscribe(ctx context.Context, rdb *redis.Client, topic string, subscriberId int, wg *sync.WaitGroup) { |
| 150 | defer wg.Done() |
| 151 | rec := rdb.Subscribe(ctx, topic) |
| 152 | recChan := rec.Channel() |
| 153 | for { |
| 154 | select { |
| 155 | case <-ctx.Done(): |
| 156 | rec.Close() |
| 157 | return |
| 158 | default: |
| 159 | select { |
| 160 | case <-ctx.Done(): |
| 161 | rec.Close() |
| 162 | return |
| 163 | case msg := <-recChan: |
| 164 | err := rdb.Incr(ctx, "received").Err() |
| 165 | if err != nil { |
| 166 | if err.Error() != "context canceled" { |
| 167 | log.Printf("%s\n", err.Error()) |
| 168 | cntErrors.Add(1) |
| 169 | } |
| 170 | } |
| 171 | _ = msg // Use the message to avoid unused variable warning |
| 172 | } |
| 173 | } |
| 174 | } |
| 175 | } |
Tested by
no test coverage detected