MCPcopy
hub / github.com/redis/redis-py / test_push_handler

Method test_push_handler

tests/test_asyncio/test_pubsub.py:755–764  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

753 self.message = ["my handler", message]
754
755 async def test_push_handler(self, r):
756 if get_protocol_version(r) in [2, "2", None]:
757 return
758 p = r.pubsub(push_handler_func=self.my_handler)
759 await p.subscribe("foo")
760 assert await wait_for_message(p) is None
761 assert self.message == ["my handler", [b"subscribe", b"foo", 1]]
762 assert await r.publish("foo", "test message") == 1
763 assert await wait_for_message(p) is None
764 assert self.message == ["my handler", [b"message", b"foo", b"test message"]]
765
766
767@pytest.mark.onlynoncluster

Callers

nothing calls this directly

Calls 5

get_protocol_versionFunction · 0.90
publishMethod · 0.80
wait_for_messageFunction · 0.70
pubsubMethod · 0.45
subscribeMethod · 0.45

Tested by

no test coverage detected