(sock)
| 232 | fut = asyncio.Future() |
| 233 | |
| 234 | def prog(sock): |
| 235 | try: |
| 236 | sock.starttls(client_sslctx) |
| 237 | sock.connect(addr) |
| 238 | sock.send(A_DATA) |
| 239 | |
| 240 | data = sock.recv_all(2) |
| 241 | self.assertEqual(data, b'OK') |
| 242 | |
| 243 | sock.send(B_DATA) |
| 244 | data = sock.recv_all(4) |
| 245 | self.assertEqual(data, b'SPAM') |
| 246 | |
| 247 | sock.close() |
| 248 | |
| 249 | except Exception as ex: |
| 250 | self.loop.call_soon_threadsafe(fut.set_exception, ex) |
| 251 | else: |
| 252 | self.loop.call_soon_threadsafe(fut.set_result, None) |
| 253 | |
| 254 | client = self.tcp_client(prog) |
| 255 | client.start() |
nothing calls this directly
no test coverage detected