(b: Union[bytes, bytearray])
| 347 | if request.streaming_callback: |
| 348 | |
| 349 | def write_function(b: Union[bytes, bytearray]) -> int: |
| 350 | assert request.streaming_callback is not None |
| 351 | self.io_loop.add_callback(request.streaming_callback, b) |
| 352 | return len(b) |
| 353 | |
| 354 | else: |
| 355 | write_function = buffer.write # type: ignore |
nothing calls this directly
no test coverage detected