Create a worker for testing.
(self)
| 500 | """Tests for connection acceptance.""" |
| 501 | |
| 502 | def create_worker(self): |
| 503 | """Create a worker for testing.""" |
| 504 | cfg = Config() |
| 505 | cfg.set('workers', 1) |
| 506 | cfg.set('threads', 4) |
| 507 | cfg.set('worker_connections', 1000) |
| 508 | |
| 509 | worker = gthread.ThreadWorker( |
| 510 | age=1, |
| 511 | ppid=os.getpid(), |
| 512 | sockets=[], |
| 513 | app=mock.Mock(), |
| 514 | timeout=30, |
| 515 | cfg=cfg, |
| 516 | log=mock.Mock(), |
| 517 | ) |
| 518 | worker.poller = mock.Mock() |
| 519 | worker.tpool = mock.Mock() |
| 520 | worker.method_queue = mock.Mock() |
| 521 | return worker |
| 522 | |
| 523 | def test_accept_success(self): |
| 524 | """Test successful connection acceptance.""" |
no test coverage detected