(self, r)
| 863 | self.message = ["my handler", message] |
| 864 | |
| 865 | def test_push_handler(self, r): |
| 866 | if is_resp2_connection(r): |
| 867 | return |
| 868 | p = r.pubsub(push_handler_func=self.my_handler) |
| 869 | p.subscribe("foo") |
| 870 | assert wait_for_message(p) is None |
| 871 | assert self.message == ["my handler", [b"subscribe", b"foo", 1]] |
| 872 | assert r.publish("foo", "test message") == 1 |
| 873 | assert wait_for_message(p) is None |
| 874 | assert self.message == ["my handler", [b"message", b"foo", b"test message"]] |
| 875 | |
| 876 | @skip_if_server_version_lt("7.0.0") |
| 877 | def test_push_handler_sharded_pubsub(self, r): |
nothing calls this directly
no test coverage detected