(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string)
| 267 | } |
| 268 | |
| 269 | func upsertPushConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (PushConsumer, error) { |
| 270 | if cfg.DeliverSubject == "" { |
| 271 | return nil, ErrNotPushConsumer |
| 272 | } |
| 273 | |
| 274 | resp, err := upsertConsumer(ctx, js, stream, cfg, action) |
| 275 | if err != nil { |
| 276 | return nil, err |
| 277 | } |
| 278 | |
| 279 | return &pushConsumer{ |
| 280 | js: js, |
| 281 | stream: stream, |
| 282 | name: resp.Name, |
| 283 | info: resp.ConsumerInfo, |
| 284 | }, nil |
| 285 | } |
| 286 | |
| 287 | func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (*consumerInfoResponse, error) { |
| 288 | ctx, cancel := js.wrapContextWithoutDeadline(ctx) |
no test coverage detected