MCPcopy
hub / github.com/redis/redis-py / test_push_handler

Method test_push_handler

tests/test_pubsub.py:865–874  ·  view source on GitHub ↗
(self, r)

Source from the content-addressed store, hash-verified

863 self.message = ["my handler", message]
864
865 def test_push_handler(self, r):
866 if is_resp2_connection(r):
867 return
868 p = r.pubsub(push_handler_func=self.my_handler)
869 p.subscribe("foo")
870 assert wait_for_message(p) is None
871 assert self.message == ["my handler", [b"subscribe", b"foo", 1]]
872 assert r.publish("foo", "test message") == 1
873 assert wait_for_message(p) is None
874 assert self.message == ["my handler", [b"message", b"foo", b"test message"]]
875
876 @skip_if_server_version_lt("7.0.0")
877 def test_push_handler_sharded_pubsub(self, r):

Callers

nothing calls this directly

Calls 5

is_resp2_connectionFunction · 0.85
publishMethod · 0.80
wait_for_messageFunction · 0.70
pubsubMethod · 0.45
subscribeMethod · 0.45

Tested by

no test coverage detected