MCPcopy
hub / github.com/nats-io/nats.go / bindRecvChan

Method bindRecvChan

netchan.go:80–117  ·  view source on GitHub ↗

Internal function to bind receive operations for a channel.

(subject, queue string, channel any)

Source from the content-addressed store, hash-verified

78
79// Internal function to bind receive operations for a channel.
80func (c *EncodedConn) bindRecvChan(subject, queue string, channel any) (*Subscription, error) {
81 chVal := reflect.ValueOf(channel)
82 if chVal.Kind() != reflect.Chan {
83 return nil, ErrChanArg
84 }
85 argType := chVal.Type().Elem()
86
87 cb := func(m *Msg) {
88 var oPtr reflect.Value
89 if argType.Kind() != reflect.Ptr {
90 oPtr = reflect.New(argType)
91 } else {
92 oPtr = reflect.New(argType.Elem())
93 }
94 if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil {
95 c.Conn.err = errors.New("nats: Got an error trying to unmarshal: " + err.Error())
96 if c.Conn.Opts.AsyncErrorCB != nil {
97 c.Conn.ach.push(func() { c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, c.Conn.err) })
98 }
99 return
100 }
101 if argType.Kind() != reflect.Ptr {
102 oPtr = reflect.Indirect(oPtr)
103 }
104 // This is a bit hacky, but in this instance we may be trying to send to a closed channel.
105 // and the user does not know when it is safe to close the channel.
106 defer func() {
107 // If we have panicked, recover and close the subscription.
108 if r := recover(); r != nil {
109 m.Sub.Unsubscribe()
110 }
111 }()
112 // Actually do the send to the channel.
113 chVal.Send(oPtr)
114 }
115
116 return c.Conn.subscribe(subject, queue, cb, nil, nil, false, nil)
117}

Callers 2

BindRecvChanMethod · 0.95
BindRecvQueueChanMethod · 0.95

Calls 6

TypeMethod · 0.80
UnsubscribeMethod · 0.80
DecodeMethod · 0.65
ErrorMethod · 0.65
pushMethod · 0.45
subscribeMethod · 0.45

Tested by

no test coverage detected