(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string)
| 285 | } |
| 286 | |
| 287 | func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (*consumerInfoResponse, error) { |
| 288 | ctx, cancel := js.wrapContextWithoutDeadline(ctx) |
| 289 | if cancel != nil { |
| 290 | defer cancel() |
| 291 | } |
| 292 | req := createConsumerRequest{ |
| 293 | Stream: stream, |
| 294 | Config: &cfg, |
| 295 | Action: action, |
| 296 | } |
| 297 | reqJSON, err := json.Marshal(req) |
| 298 | if err != nil { |
| 299 | return nil, err |
| 300 | } |
| 301 | |
| 302 | consumerName := cfg.Name |
| 303 | if consumerName == "" { |
| 304 | if cfg.Durable != "" { |
| 305 | consumerName = cfg.Durable |
| 306 | } else { |
| 307 | consumerName = generateConsName() |
| 308 | } |
| 309 | } |
| 310 | if err := validateConsumerName(consumerName); err != nil { |
| 311 | return nil, err |
| 312 | } |
| 313 | |
| 314 | var ccSubj string |
| 315 | if cfg.FilterSubject != "" && len(cfg.FilterSubjects) == 0 { |
| 316 | if err := validateSubject(cfg.FilterSubject); err != nil { |
| 317 | return nil, err |
| 318 | } |
| 319 | ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject) |
| 320 | } else { |
| 321 | ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName) |
| 322 | } |
| 323 | var resp consumerInfoResponse |
| 324 | |
| 325 | if _, err := js.apiRequestJSON(ctx, ccSubj, &resp, reqJSON); err != nil { |
| 326 | return nil, err |
| 327 | } |
| 328 | if resp.Error != nil { |
| 329 | if resp.Error.ErrorCode == JSErrCodeStreamNotFound { |
| 330 | return nil, ErrStreamNotFound |
| 331 | } |
| 332 | if resp.Error.ErrorCode == JSErrCodeMaximumConsumersLimit { |
| 333 | return nil, ErrMaximumConsumersLimit |
| 334 | } |
| 335 | |
| 336 | return nil, resp.Error |
| 337 | } |
| 338 | |
| 339 | if resp.Error == nil && resp.ConsumerInfo == nil { |
| 340 | return nil, ErrConsumerCreationResponseEmpty |
| 341 | } |
| 342 | |
| 343 | // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo |
| 344 | if len(cfg.FilterSubjects) != 0 && len(resp.Config.FilterSubjects) == 0 { |
no test coverage detected