MCPcopy
hub / github.com/nats-io/nats.go / upsertConsumer

Function upsertConsumer

jetstream/consumer.go:287–349  ·  view source on GitHub ↗
(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string)

Source from the content-addressed store, hash-verified

285}
286
287func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (*consumerInfoResponse, error) {
288 ctx, cancel := js.wrapContextWithoutDeadline(ctx)
289 if cancel != nil {
290 defer cancel()
291 }
292 req := createConsumerRequest{
293 Stream: stream,
294 Config: &cfg,
295 Action: action,
296 }
297 reqJSON, err := json.Marshal(req)
298 if err != nil {
299 return nil, err
300 }
301
302 consumerName := cfg.Name
303 if consumerName == "" {
304 if cfg.Durable != "" {
305 consumerName = cfg.Durable
306 } else {
307 consumerName = generateConsName()
308 }
309 }
310 if err := validateConsumerName(consumerName); err != nil {
311 return nil, err
312 }
313
314 var ccSubj string
315 if cfg.FilterSubject != "" && len(cfg.FilterSubjects) == 0 {
316 if err := validateSubject(cfg.FilterSubject); err != nil {
317 return nil, err
318 }
319 ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
320 } else {
321 ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
322 }
323 var resp consumerInfoResponse
324
325 if _, err := js.apiRequestJSON(ctx, ccSubj, &resp, reqJSON); err != nil {
326 return nil, err
327 }
328 if resp.Error != nil {
329 if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
330 return nil, ErrStreamNotFound
331 }
332 if resp.Error.ErrorCode == JSErrCodeMaximumConsumersLimit {
333 return nil, ErrMaximumConsumersLimit
334 }
335
336 return nil, resp.Error
337 }
338
339 if resp.Error == nil && resp.ConsumerInfo == nil {
340 return nil, ErrConsumerCreationResponseEmpty
341 }
342
343 // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
344 if len(cfg.FilterSubjects) != 0 && len(resp.Config.FilterSubjects) == 0 {

Callers 2

upsertPullConsumerFunction · 0.85
upsertPushConsumerFunction · 0.85

Calls 5

generateConsNameFunction · 0.85
validateConsumerNameFunction · 0.85
apiRequestJSONMethod · 0.80
validateSubjectFunction · 0.70

Tested by

no test coverage detected