MCPcopy
hub / github.com/celery/celery / _context

Method _context

t/unit/worker/test_strategy.py:123–149  ·  view source on GitHub ↗
(self, sig,
                 rate_limits=True, events=True, utc=True, limit=None)

Source from the content-addressed store, hash-verified

121
122 @contextmanager
123 def _context(self, sig,
124 rate_limits=True, events=True, utc=True, limit=None):
125 assert sig.type.Strategy
126 assert sig.type.Request
127
128 reserved = Mock()
129 consumer = Mock()
130 # Create a proper mock for task_buckets that supports __getitem__
131 task_buckets_mock = Mock()
132 task_buckets_mock.__getitem__ = Mock(side_effect=lambda key: None)
133 consumer.task_buckets = task_buckets_mock
134 if limit:
135 bucket = TokenBucket(rate(limit), capacity=1)
136 task_buckets_mock.__getitem__.side_effect = (
137 lambda key: bucket if key == sig.task else None
138 )
139 consumer.controller.state.revoked = set()
140 consumer.disable_rate_limits = not rate_limits
141 consumer.event_dispatcher.enabled = events
142 s = sig.type.start_strategy(self.app, consumer, task_reserved=reserved)
143 assert s
144
145 message = self.task_message_from_sig(
146 self.app, sig, utc=utc, TaskMessage=self.get_message_class(),
147 )
148 message = self.prepare_message(message)
149 yield self.Context(sig, s, reserved, consumer, message)
150
151 def test_when_logging_disabled(self, caplog):
152 # Capture logs at any level above `NOTSET`

Calls 4

get_message_classMethod · 0.95
prepare_messageMethod · 0.95
rateFunction · 0.90
start_strategyMethod · 0.80

Tested by

no test coverage detected