Test successful connection acceptance.
(self)
| 521 | return worker |
| 522 | |
| 523 | def test_accept_success(self): |
| 524 | """Test successful connection acceptance.""" |
| 525 | worker = self.create_worker() |
| 526 | worker.nr_conns = 0 |
| 527 | |
| 528 | client_sock = FakeSocket() |
| 529 | client_addr = ('127.0.0.1', 12345) |
| 530 | listener = mock.Mock() |
| 531 | listener.accept.return_value = (client_sock, client_addr) |
| 532 | listener.getsockname.return_value = ('127.0.0.1', 8000) |
| 533 | |
| 534 | worker.accept(listener) |
| 535 | |
| 536 | assert worker.nr_conns == 1 |
| 537 | worker.tpool.submit.assert_called_once() |
| 538 | |
| 539 | def test_accept_eagain(self): |
| 540 | """Test handling of EAGAIN during accept.""" |
nothing calls this directly
no test coverage detected