(envvar='CELERY_RDBSIG',
sig='SIGUSR2')
| 500 | |
| 501 | |
| 502 | def install_rdb_handler(envvar='CELERY_RDBSIG', |
| 503 | sig='SIGUSR2'): # pragma: no cover |
| 504 | |
| 505 | def rdb_handler(*args): |
| 506 | """Signal handler setting a rdb breakpoint at the current frame.""" |
| 507 | with in_sighandler(): |
| 508 | from celery.contrib.rdb import _frame, set_trace |
| 509 | |
| 510 | # gevent does not pass standard signal handler args |
| 511 | frame = args[1] if args else _frame().f_back |
| 512 | set_trace(frame) |
| 513 | if os.environ.get(envvar): |
| 514 | platforms.signals[sig] = rdb_handler |
| 515 | |
| 516 | |
| 517 | def install_HUP_not_supported_handler(worker, sig='SIGHUP'): |
no test coverage detected