(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string)
| 251 | } |
| 252 | |
| 253 | func upsertPullConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (Consumer, error) { |
| 254 | resp, err := upsertConsumer(ctx, js, stream, cfg, action) |
| 255 | if err != nil { |
| 256 | return nil, err |
| 257 | } |
| 258 | |
| 259 | return &pullConsumer{ |
| 260 | js: js, |
| 261 | stream: stream, |
| 262 | name: resp.Name, |
| 263 | durable: cfg.Durable != "", |
| 264 | info: resp.ConsumerInfo, |
| 265 | subs: syncx.Map[string, *pullSubscription]{}, |
| 266 | }, nil |
| 267 | } |
| 268 | |
| 269 | func upsertPushConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (PushConsumer, error) { |
| 270 | if cfg.DeliverSubject == "" { |
no test coverage detected