ReceiveTimeout acts like Receive but returns an error if message is not received in time. This is low-level API and in most cases Channel should be used instead.
(ctx context.Context, timeout time.Duration)
| 488 | // is not received in time. This is low-level API and in most cases |
| 489 | // Channel should be used instead. |
| 490 | func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (interface{}, error) { |
| 491 | if c.cmd == nil { |
| 492 | c.cmd = NewCmd(ctx) |
| 493 | } |
| 494 | |
| 495 | // Don't hold the lock to allow subscriptions and pings. |
| 496 | cn, err := c.connWithLock(ctx) |
| 497 | if err != nil { |
| 498 | return nil, err |
| 499 | } |
| 500 | |
| 501 | err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error { |
| 502 | // To be sure there are no buffered push notifications, we process them before reading the reply |
| 503 | if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { |
| 504 | // Log the error but don't fail the command execution |
| 505 | // Push notification processing errors shouldn't break normal Redis operations |
| 506 | internal.Logger.Printf(ctx, "push: conn[%d] error processing pending notifications before reading reply: %v", cn.GetID(), err) |
| 507 | } |
| 508 | return c.cmd.readReply(rd) |
| 509 | }) |
| 510 | c.releaseConnWithLock(ctx, cn, err, timeout > 0) |
| 511 | |
| 512 | if err != nil { |
| 513 | return nil, err |
| 514 | } |
| 515 | |
| 516 | return c.newMessage(ctx, cn, c.cmd.Val()) |
| 517 | } |
| 518 | |
| 519 | // Receive returns a message as a Subscription, Message, Pong or error. |
| 520 | // See PubSub example for details. This is low-level API and in most cases |