MCPcopy
hub / github.com/celery/celery / selection_rate_limit

Method selection_rate_limit

celery/events/cursesmon.py:173–189  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

171 pass
172
173 def selection_rate_limit(self):
174 if not self.selected_task:
175 return curses.beep()
176 task = self.state.tasks[self.selected_task]
177 if not task.name:
178 return curses.beep()
179
180 my, mx = self.win.getmaxyx()
181 r = 'New rate limit: '
182 self.win.addstr(my - 2, 3, r, curses.A_BOLD | curses.A_UNDERLINE)
183 self.win.addstr(my - 2, len(r) + 3, ' ' * (mx - len(r)))
184 rlimit = self.readline(my - 2, 3 + len(r))
185
186 if rlimit:
187 reply = self.app.control.rate_limit(task.name,
188 rlimit.strip(), reply=True)
189 self.alert_remote_control_reply(reply)
190
191 def alert_remote_control_reply(self, reply):
192

Callers

nothing calls this directly

Calls 4

readlineMethod · 0.95
getmaxyxMethod · 0.80
rate_limitMethod · 0.80

Tested by

no test coverage detected