(self)
| 64 | assert not x.enabled |
| 65 | |
| 66 | def test_send(self): |
| 67 | producer = MockProducer() |
| 68 | producer.connection = self.app.connection_for_write() |
| 69 | connection = Mock() |
| 70 | connection.transport.driver_type = 'amqp' |
| 71 | eventer = self.app.events.Dispatcher(connection, enabled=False, |
| 72 | buffer_while_offline=False) |
| 73 | eventer.producer = producer |
| 74 | eventer.enabled = True |
| 75 | eventer.send('World War II', ended=True) |
| 76 | assert producer.has_event('World War II') |
| 77 | eventer.enabled = False |
| 78 | eventer.send('World War III') |
| 79 | assert not producer.has_event('World War III') |
| 80 | |
| 81 | evs = ('Event 1', 'Event 2', 'Event 3') |
| 82 | eventer.enabled = True |
| 83 | eventer.producer.raise_on_publish = True |
| 84 | eventer.buffer_while_offline = False |
| 85 | with pytest.raises(KeyError): |
| 86 | eventer.send('Event X') |
| 87 | eventer.buffer_while_offline = True |
| 88 | for ev in evs: |
| 89 | eventer.send(ev) |
| 90 | eventer.producer.raise_on_publish = False |
| 91 | eventer.flush() |
| 92 | for ev in evs: |
| 93 | assert producer.has_event(ev) |
| 94 | eventer.flush() |
| 95 | |
| 96 | def test_send_buffer_group(self): |
| 97 | buf_received = [None] |
nothing calls this directly
no test coverage detected