MCPcopy
hub / github.com/celery/celery / assert_broadcast_called

Method assert_broadcast_called

t/unit/app/test_control.py:73–94  ·  view source on GitHub ↗
(self, command,
                                destination=None,
                                callback=None,
                                connection=None,
                                limit=None,
                                timeout=None,
                                reply=True,
                                pattern=None,
                                matcher=None,
                                **arguments)

Source from the content-addressed store, hash-verified

71 assert i._prepare([{'w1': {'ok': 1}}]) == {'ok': 1}
72
73 def assert_broadcast_called(self, command,
74 destination=None,
75 callback=None,
76 connection=None,
77 limit=None,
78 timeout=None,
79 reply=True,
80 pattern=None,
81 matcher=None,
82 **arguments):
83 self.app.control.broadcast.assert_called_with(
84 command,
85 arguments=arguments,
86 destination=destination or self.inspect.destination,
87 pattern=pattern or self.inspect.pattern,
88 matcher=matcher or self.inspect.destination,
89 callback=callback or self.inspect.callback,
90 connection=connection or self.inspect.connection,
91 limit=limit if limit is not None else self.inspect.limit,
92 timeout=timeout if timeout is not None else self.inspect.timeout,
93 reply=reply,
94 )
95
96 def test_active(self):
97 self.inspect.active()

Callers 15

test_activeMethod · 0.95
test_active_safeMethod · 0.95
test_clockMethod · 0.95
test_confMethod · 0.95
test_helloMethod · 0.95
test_memsampleMethod · 0.95
test_memdumpMethod · 0.95
test_objgraphMethod · 0.95
test_scheduledMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected