MCPcopy
hub / github.com/celery/celery / test_on_elect_ack_lose

Method test_on_elect_ack_lose

t/unit/worker/test_consumer.py:1207–1213  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

1205 handler.assert_called_with('action')
1206
1207 def test_on_elect_ack_lose(self):
1208 c = self.Consumer(hostname='bar@x.com') # I will lose
1209 c.app.connection_for_read = _amqp_connection()
1210 g = Gossip(c)
1211 handler = g.election_handlers['topic'] = Mock()
1212 self.setup_election(g, c)
1213 handler.assert_not_called()
1214
1215 def test_on_elect_ack_win_but_no_action(self):
1216 c = self.Consumer(hostname='foo@x.com') # I will win

Callers

nothing calls this directly

Calls 4

ConsumerMethod · 0.95
setup_electionMethod · 0.95
GossipClass · 0.90
_amqp_connectionFunction · 0.85

Tested by

no test coverage detected