(self)
| 309 | self.assert_pending() |
| 310 | |
| 311 | def test_success_after_timeout(self): |
| 312 | conn, future = self.start_connect(self.addrinfo) |
| 313 | self.assert_pending((AF1, "a")) |
| 314 | conn.on_timeout() |
| 315 | self.assert_pending((AF1, "a"), (AF2, "c")) |
| 316 | self.resolve_connect(AF1, "a", True) |
| 317 | self.assertEqual(future.result(), (AF1, "a", self.streams["a"])) |
| 318 | # resolving 'c' after completion closes the connection. |
| 319 | self.resolve_connect(AF2, "c", True) |
| 320 | self.assertTrue(self.streams.pop("c").closed) |
| 321 | |
| 322 | def test_all_fail(self): |
| 323 | conn, future = self.start_connect(self.addrinfo) |
nothing calls this directly
no test coverage detected