MCPcopy
hub / github.com/celery/celery / test_send

Method test_send

t/unit/events/test_events.py:66–94  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

64 assert not x.enabled
65
66 def test_send(self):
67 producer = MockProducer()
68 producer.connection = self.app.connection_for_write()
69 connection = Mock()
70 connection.transport.driver_type = 'amqp'
71 eventer = self.app.events.Dispatcher(connection, enabled=False,
72 buffer_while_offline=False)
73 eventer.producer = producer
74 eventer.enabled = True
75 eventer.send('World War II', ended=True)
76 assert producer.has_event('World War II')
77 eventer.enabled = False
78 eventer.send('World War III')
79 assert not producer.has_event('World War III')
80
81 evs = ('Event 1', 'Event 2', 'Event 3')
82 eventer.enabled = True
83 eventer.producer.raise_on_publish = True
84 eventer.buffer_while_offline = False
85 with pytest.raises(KeyError):
86 eventer.send('Event X')
87 eventer.buffer_while_offline = True
88 for ev in evs:
89 eventer.send(ev)
90 eventer.producer.raise_on_publish = False
91 eventer.flush()
92 for ev in evs:
93 assert producer.has_event(ev)
94 eventer.flush()
95
96 def test_send_buffer_group(self):
97 buf_received = [None]

Callers

nothing calls this directly

Calls 7

has_eventMethod · 0.95
MockProducerClass · 0.85
DispatcherMethod · 0.80
connection_for_writeMethod · 0.45
sendMethod · 0.45
raisesMethod · 0.45
flushMethod · 0.45

Tested by

no test coverage detected