(self, r)
| 753 | self.message = ["my handler", message] |
| 754 | |
| 755 | async def test_push_handler(self, r): |
| 756 | if get_protocol_version(r) in [2, "2", None]: |
| 757 | return |
| 758 | p = r.pubsub(push_handler_func=self.my_handler) |
| 759 | await p.subscribe("foo") |
| 760 | assert await wait_for_message(p) is None |
| 761 | assert self.message == ["my handler", [b"subscribe", b"foo", 1]] |
| 762 | assert await r.publish("foo", "test message") == 1 |
| 763 | assert await wait_for_message(p) is None |
| 764 | assert self.message == ["my handler", [b"message", b"foo", b"test message"]] |
| 765 | |
| 766 | |
| 767 | @pytest.mark.onlynoncluster |
nothing calls this directly
no test coverage detected