Internal function to bind receive operations for a channel.
(subject, queue string, channel any)
| 78 | |
| 79 | // Internal function to bind receive operations for a channel. |
| 80 | func (c *EncodedConn) bindRecvChan(subject, queue string, channel any) (*Subscription, error) { |
| 81 | chVal := reflect.ValueOf(channel) |
| 82 | if chVal.Kind() != reflect.Chan { |
| 83 | return nil, ErrChanArg |
| 84 | } |
| 85 | argType := chVal.Type().Elem() |
| 86 | |
| 87 | cb := func(m *Msg) { |
| 88 | var oPtr reflect.Value |
| 89 | if argType.Kind() != reflect.Ptr { |
| 90 | oPtr = reflect.New(argType) |
| 91 | } else { |
| 92 | oPtr = reflect.New(argType.Elem()) |
| 93 | } |
| 94 | if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil { |
| 95 | c.Conn.err = errors.New("nats: Got an error trying to unmarshal: " + err.Error()) |
| 96 | if c.Conn.Opts.AsyncErrorCB != nil { |
| 97 | c.Conn.ach.push(func() { c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, c.Conn.err) }) |
| 98 | } |
| 99 | return |
| 100 | } |
| 101 | if argType.Kind() != reflect.Ptr { |
| 102 | oPtr = reflect.Indirect(oPtr) |
| 103 | } |
| 104 | // This is a bit hacky, but in this instance we may be trying to send to a closed channel. |
| 105 | // and the user does not know when it is safe to close the channel. |
| 106 | defer func() { |
| 107 | // If we have panicked, recover and close the subscription. |
| 108 | if r := recover(); r != nil { |
| 109 | m.Sub.Unsubscribe() |
| 110 | } |
| 111 | }() |
| 112 | // Actually do the send to the channel. |
| 113 | chVal.Send(oPtr) |
| 114 | } |
| 115 | |
| 116 | return c.Conn.subscribe(subject, queue, cb, nil, nil, false, nil) |
| 117 | } |