Consume can be used to continuously receive messages and handle them with the provided callback function. Consume cannot be used concurrently when using ordered consumer. See [Consumer.Consume] for more details.
(handler MessageHandler, opts ...PullConsumeOpt)
| 200 | // |
| 201 | // See [Consumer.Consume] for more details. |
| 202 | func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) { |
| 203 | if handler == nil { |
| 204 | return nil, ErrHandlerRequired |
| 205 | } |
| 206 | consumeOpts, err := parseConsumeOpts(false, opts...) |
| 207 | if err != nil { |
| 208 | return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) |
| 209 | } |
| 210 | |
| 211 | if len(p.info.Config.PriorityGroups) != 0 { |
| 212 | if consumeOpts.Group == "" { |
| 213 | return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is required for priority consumer") |
| 214 | } |
| 215 | |
| 216 | if !slices.Contains(p.info.Config.PriorityGroups, consumeOpts.Group) { |
| 217 | return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "invalid priority group") |
| 218 | } |
| 219 | } else if consumeOpts.Group != "" { |
| 220 | return nil, fmt.Errorf("%w: %s", ErrInvalidOption, "priority group is not supported for this consumer") |
| 221 | } |
| 222 | |
| 223 | p.Lock() |
| 224 | |
| 225 | subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name)) |
| 226 | |
| 227 | consumeID := nuid.Next() |
| 228 | sub := &pullSubscription{ |
| 229 | id: consumeID, |
| 230 | consumer: p, |
| 231 | errs: make(chan error, 10), |
| 232 | done: make(chan struct{}, 1), |
| 233 | fetchNext: make(chan *pullRequest, 1), |
| 234 | consumeOpts: consumeOpts, |
| 235 | } |
| 236 | sub.connStatusChanged = p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING, nats.CLOSED) |
| 237 | |
| 238 | sub.hbMonitor = sub.scheduleHeartbeatCheck(consumeOpts.Heartbeat) |
| 239 | |
| 240 | p.subs.Store(sub.id, sub) |
| 241 | p.Unlock() |
| 242 | |
| 243 | internalHandler := func(msg *nats.Msg) { |
| 244 | if sub.hbMonitor != nil { |
| 245 | sub.hbMonitor.Stop() |
| 246 | } |
| 247 | userMsg, msgErr := checkMsg(msg) |
| 248 | if !userMsg && msgErr == nil { |
| 249 | if sub.hbMonitor != nil { |
| 250 | sub.hbMonitor.Reset(2 * consumeOpts.Heartbeat) |
| 251 | } |
| 252 | return |
| 253 | } |
| 254 | if !userMsg { |
| 255 | // heartbeat message |
| 256 | if msgErr == nil { |
| 257 | return |
| 258 | } |
| 259 |
nothing calls this directly
no test coverage detected