(self, event: StreamEnded)
| 432 | self.metadata["certificate"] = Certificate(self.transport.getPeerCertificate()) |
| 433 | |
| 434 | def stream_ended(self, event: StreamEnded) -> None: |
| 435 | try: |
| 436 | stream = self.pop_stream(event.stream_id) |
| 437 | except KeyError: |
| 438 | pass # We ignore server-initiated events |
| 439 | else: |
| 440 | stream.close(StreamCloseReason.ENDED, from_protocol=True) |
| 441 | |
| 442 | def stream_reset(self, event: StreamReset) -> None: |
| 443 | try: |
no test coverage detected