(self, r)
| 690 | self.message = message |
| 691 | |
| 692 | def test_published_message_to_channel(self, r): |
| 693 | p = r.pubsub() |
| 694 | p.subscribe("foo") |
| 695 | assert wait_for_message(p) == make_message("subscribe", "foo", 1) |
| 696 | assert r.publish("foo", "test message") == 1 |
| 697 | |
| 698 | message = wait_for_message(p) |
| 699 | assert isinstance(message, dict) |
| 700 | assert message == make_message("message", "foo", "test message") |
| 701 | |
| 702 | @pytest.mark.onlynoncluster |
| 703 | @skip_if_server_version_lt("7.0.0") |
nothing calls this directly
no test coverage detected