(
args: tuple[Any, ...], kwargs: Mapping[str, PubSubHandler]
)
| 33 | |
| 34 | |
| 35 | def parse_pubsub_subscriptions( |
| 36 | args: tuple[Any, ...], kwargs: Mapping[str, PubSubHandler] |
| 37 | ) -> dict[ChannelT, PubSubHandler | None]: |
| 38 | parsed_args = list_or_args(args[0], args[1:]) if args else [] |
| 39 | subscriptions: dict[ChannelT, PubSubHandler | None] = {} |
| 40 | for arg in parsed_args: |
| 41 | if isinstance(arg, Subscription): |
| 42 | subscriptions[arg.name] = arg.handler |
| 43 | else: |
| 44 | subscriptions[arg] = None |
| 45 | subscriptions.update(kwargs) |
| 46 | return subscriptions |
| 47 | |
| 48 | |
| 49 | def pubsub_subscription_args( |
no test coverage detected