MCPcopy
hub / github.com/nats-io/nats.go / Consume

Method Consume

jetstream/push.go:47–178  ·  view source on GitHub ↗
(handler MessageHandler, opts ...PushConsumeOpt)

Source from the content-addressed store, hash-verified

45)
46
47func (p *pushConsumer) Consume(handler MessageHandler, opts ...PushConsumeOpt) (ConsumeContext, error) {
48 if handler == nil {
49 return nil, ErrHandlerRequired
50 }
51 consumeOpts := &pushConsumeOpts{}
52 for _, opt := range opts {
53 if err := opt.configurePushConsume(consumeOpts); err != nil {
54 return nil, err
55 }
56 }
57
58 p.Lock()
59 defer p.Unlock()
60
61 if p.info == nil {
62 return nil, ErrConsumerNotFound
63 }
64
65 if p.started.Load() {
66 return nil, ErrConsumerAlreadyConsuming
67 }
68
69 consumeID := nuid.Next()
70 sub := &pushSubscription{
71 id: consumeID,
72 errs: make(chan error, 1),
73 done: make(chan struct{}, 1),
74 consumeOpts: consumeOpts,
75 connStatusChanged: p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING),
76 idleHeartbeat: p.info.Config.IdleHeartbeat,
77 }
78
79 sub.hbMonitor = sub.scheduleHeartbeatCheck(sub.idleHeartbeat)
80 internalHandler := func(msg *nats.Msg) {
81 if sub.hbMonitor != nil {
82 sub.hbMonitor.Stop()
83 }
84 defer func() {
85 if sub.hbMonitor != nil {
86 sub.hbMonitor.Reset(2 * sub.idleHeartbeat)
87 }
88 }()
89 status, descr := msg.Header.Get("Status"), msg.Header.Get("Description")
90 if status == "" {
91 jsMsg := p.js.toJSMsg(msg)
92 handler(jsMsg)
93 return
94 }
95 sub.Lock()
96 if err, terminate := sub.handleStatusMsg(msg, status, descr); err != nil {
97 if sub.consumeOpts.ErrHandler != nil {
98 sub.consumeOpts.ErrHandler(sub, err)
99 }
100 if terminate {
101 sub.Stop()
102 }
103 }
104 sub.Unlock()

Callers

nothing calls this directly

Calls 15

handleStatusMsgMethod · 0.95
StopMethod · 0.95
LoadMethod · 0.80
toJSMsgMethod · 0.80
StoreMethod · 0.80
configurePushConsumeMethod · 0.65
NextMethod · 0.65
StopMethod · 0.65
ResetMethod · 0.65
GetMethod · 0.65
QueueSubscribeMethod · 0.65

Tested by

no test coverage detected