MCPcopy
hub / github.com/redis/redis-py / parse_pubsub_subscriptions

Function parse_pubsub_subscriptions

redis/commands/helpers.py:35–46  ·  view source on GitHub ↗
(
    args: tuple[Any, ...], kwargs: Mapping[str, PubSubHandler]
)

Source from the content-addressed store, hash-verified

33
34
35def parse_pubsub_subscriptions(
36 args: tuple[Any, ...], kwargs: Mapping[str, PubSubHandler]
37) -> dict[ChannelT, PubSubHandler | None]:
38 parsed_args = list_or_args(args[0], args[1:]) if args else []
39 subscriptions: dict[ChannelT, PubSubHandler | None] = {}
40 for arg in parsed_args:
41 if isinstance(arg, Subscription):
42 subscriptions[arg.name] = arg.handler
43 else:
44 subscriptions[arg] = None
45 subscriptions.update(kwargs)
46 return subscriptions
47
48
49def pubsub_subscription_args(

Callers 8

psubscribeMethod · 0.90
subscribeMethod · 0.90
ssubscribeMethod · 0.90
ssubscribeMethod · 0.90
psubscribeMethod · 0.90
subscribeMethod · 0.90
ssubscribeMethod · 0.90
ssubscribeMethod · 0.90

Calls 2

list_or_argsFunction · 0.85
updateMethod · 0.80

Tested by

no test coverage detected