Send data on a stream. Args: stream_id: The stream ID data: Body data bytes end_stream: Whether this ends the stream Returns: bool: True if data sent, False if stream was already closed
(self, stream_id, data, end_stream=False)
| 438 | return self.h2_conn.local_flow_control_window(stream_id) |
| 439 | |
| 440 | async def send_data(self, stream_id, data, end_stream=False): |
| 441 | """Send data on a stream. |
| 442 | |
| 443 | Args: |
| 444 | stream_id: The stream ID |
| 445 | data: Body data bytes |
| 446 | end_stream: Whether this ends the stream |
| 447 | |
| 448 | Returns: |
| 449 | bool: True if data sent, False if stream was already closed |
| 450 | """ |
| 451 | stream = self.streams.get(stream_id) |
| 452 | if stream is None: |
| 453 | return False |
| 454 | |
| 455 | data_to_send = data |
| 456 | try: |
| 457 | while data_to_send: |
| 458 | available = self.h2_conn.local_flow_control_window(stream_id) |
| 459 | chunk_size = min(available, self.max_frame_size, len(data_to_send)) |
| 460 | |
| 461 | if chunk_size <= 0: |
| 462 | # Wait for WINDOW_UPDATE per RFC 7540 Section 6.9.2 |
| 463 | await self._send_pending_data() |
| 464 | available = await self._wait_for_flow_control_window(stream_id) |
| 465 | if available <= 0: |
| 466 | return False |
| 467 | chunk_size = min(available, self.max_frame_size, len(data_to_send)) |
| 468 | |
| 469 | chunk = data_to_send[:chunk_size] |
| 470 | data_to_send = data_to_send[chunk_size:] |
| 471 | is_final = end_stream and len(data_to_send) == 0 |
| 472 | |
| 473 | self.h2_conn.send_data(stream_id, chunk, end_stream=is_final) |
| 474 | await self._send_pending_data() |
| 475 | |
| 476 | stream.send_data(data, end_stream=end_stream) |
| 477 | return True |
| 478 | except (_h2_exceptions.StreamClosedError, _h2_exceptions.FlowControlError): |
| 479 | stream.close() |
| 480 | self.cleanup_stream(stream_id) |
| 481 | return False |
| 482 | |
| 483 | async def send_trailers(self, stream_id, trailers): |
| 484 | """Send trailing headers on a stream. |