Internal implementation that all public functions will use.
(subject, queue string, cb Handler)
| 207 | |
| 208 | // Internal implementation that all public functions will use. |
| 209 | func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscription, error) { |
| 210 | if cb == nil { |
| 211 | return nil, errors.New("nats: Handler required for EncodedConn Subscription") |
| 212 | } |
| 213 | argType, numArgs := argInfo(cb) |
| 214 | if argType == nil { |
| 215 | return nil, errors.New("nats: Handler requires at least one argument") |
| 216 | } |
| 217 | |
| 218 | cbValue := reflect.ValueOf(cb) |
| 219 | wantsRaw := (argType == emptyMsgType) |
| 220 | |
| 221 | natsCB := func(m *Msg) { |
| 222 | var oV []reflect.Value |
| 223 | if wantsRaw { |
| 224 | oV = []reflect.Value{reflect.ValueOf(m)} |
| 225 | } else { |
| 226 | var oPtr reflect.Value |
| 227 | if argType.Kind() != reflect.Ptr { |
| 228 | oPtr = reflect.New(argType) |
| 229 | } else { |
| 230 | oPtr = reflect.New(argType.Elem()) |
| 231 | } |
| 232 | if err := c.Enc.Decode(m.Subject, m.Data, oPtr.Interface()); err != nil { |
| 233 | if c.Conn.Opts.AsyncErrorCB != nil { |
| 234 | c.Conn.ach.push(func() { |
| 235 | c.Conn.Opts.AsyncErrorCB(c.Conn, m.Sub, errors.New("nats: Got an error trying to unmarshal: "+err.Error())) |
| 236 | }) |
| 237 | } |
| 238 | return |
| 239 | } |
| 240 | if argType.Kind() != reflect.Ptr { |
| 241 | oPtr = reflect.Indirect(oPtr) |
| 242 | } |
| 243 | |
| 244 | // Callback Arity |
| 245 | switch numArgs { |
| 246 | case 1: |
| 247 | oV = []reflect.Value{oPtr} |
| 248 | case 2: |
| 249 | subV := reflect.ValueOf(m.Subject) |
| 250 | oV = []reflect.Value{subV, oPtr} |
| 251 | case 3: |
| 252 | subV := reflect.ValueOf(m.Subject) |
| 253 | replyV := reflect.ValueOf(m.Reply) |
| 254 | oV = []reflect.Value{subV, replyV, oPtr} |
| 255 | } |
| 256 | |
| 257 | } |
| 258 | cbValue.Call(oV) |
| 259 | } |
| 260 | |
| 261 | return c.Conn.subscribe(subject, queue, natsCB, nil, nil, false, nil) |
| 262 | } |
| 263 | |
| 264 | // FlushTimeout allows a Flush operation to have an associated timeout. |
| 265 | // |