Create a worker for testing.
(self)
| 567 | """Tests for graceful shutdown behavior.""" |
| 568 | |
| 569 | def create_worker(self): |
| 570 | """Create a worker for testing.""" |
| 571 | cfg = Config() |
| 572 | cfg.set('workers', 1) |
| 573 | cfg.set('threads', 4) |
| 574 | cfg.set('worker_connections', 1000) |
| 575 | cfg.set('graceful_timeout', 5) |
| 576 | |
| 577 | worker = gthread.ThreadWorker( |
| 578 | age=1, |
| 579 | ppid=os.getpid(), |
| 580 | sockets=[], |
| 581 | app=mock.Mock(), |
| 582 | timeout=30, |
| 583 | cfg=cfg, |
| 584 | log=mock.Mock(), |
| 585 | ) |
| 586 | return worker |
| 587 | |
| 588 | def test_handle_exit_sets_alive_false(self): |
| 589 | """Test that handle_exit begins graceful shutdown.""" |
no test coverage detected