MCPcopy
hub / github.com/nats-io/nats.go / subscribe

Method subscribe

enc.go:209–262  ·  view source on GitHub ↗

Internal implementation that all public functions will use.

(subject, queue string, cb Handler)

Source from the content-addressed store, hash-verified

207
208// Internal implementation that all public functions will use.
209func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscription, error) {
210 if cb == nil {
211 return nil, errors.New("nats: Handler required for EncodedConn Subscription")
212 }
213 argType, numArgs := argInfo(cb)
214 if argType == nil {
215 return nil, errors.New("nats: Handler requires at least one argument")
216 }
217
218 cbValue := reflect.ValueOf(cb)
219 wantsRaw := (argType == emptyMsgType)
220
221 natsCB := func(m *Msg) {
222 var oV []reflect.Value
223 if wantsRaw {
224 oV = []reflect.Value{reflect.ValueOf(m)}
225 } else {
226 var oPtr reflect.Value
227 if argType.Kind() != reflect.Ptr {
228 oPtr = reflect.New(argType)
229 } else {
230 oPtr = reflect.New(argType.Elem())
231 }
232 if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
233 if c.Conn.Opts.AsyncErrorCB != nil {
234 c.Conn.ach.push(func() {
235 c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, errors.New("nats: Got an error trying to unmarshal: "+err.Error()))
236 })
237 }
238 return
239 }
240 if argType.Kind() != reflect.Ptr {
241 oPtr = reflect.Indirect(oPtr)
242 }
243
244 // Callback Arity
245 switch numArgs {
246 case 1:
247 oV = []reflect.Value{oPtr}
248 case 2:
249 subV := reflect.ValueOf(m.Subject)
250 oV = []reflect.Value{subV, oPtr}
251 case 3:
252 subV := reflect.ValueOf(m.Subject)
253 replyV := reflect.ValueOf(m.Reply)
254 oV = []reflect.Value{subV, replyV, oPtr}
255 }
256
257 }
258 cbValue.Call(oV)
259 }
260
261 return c.Conn.subscribe(subject, queue, natsCB, nil, nil, false, nil)
262}
263
264// FlushTimeout allows a Flush operation to have an associated timeout.
265//

Callers 2

SubscribeMethod · 0.95
QueueSubscribeMethod · 0.95

Calls 4

argInfoFunction · 0.85
DecodeMethod · 0.65
ErrorMethod · 0.65
pushMethod · 0.45

Tested by

no test coverage detected