(self, unreader, data)
| 38 | return ret |
| 39 | |
| 40 | def parse_trailers(self, unreader, data): |
| 41 | buf = io.BytesIO() |
| 42 | buf.write(data) |
| 43 | |
| 44 | idx = buf.getvalue().find(b"\r\n\r\n") |
| 45 | done = buf.getvalue()[:2] == b"\r\n" |
| 46 | while idx < 0 and not done: |
| 47 | self.get_data(unreader, buf) |
| 48 | idx = buf.getvalue().find(b"\r\n\r\n") |
| 49 | done = buf.getvalue()[:2] == b"\r\n" |
| 50 | if done: |
| 51 | unreader.unread(buf.getvalue()[2:]) |
| 52 | return b"" |
| 53 | self.req.trailers = self.req.parse_headers(buf.getvalue()[:idx], from_trailer=True) |
| 54 | unreader.unread(buf.getvalue()[idx + 4:]) |
| 55 | |
| 56 | def parse_chunked(self, unreader): |
| 57 | (size, rest) = self.parse_chunk_size(unreader) |
no test coverage detected