(self, unreader)
| 54 | unreader.unread(buf.getvalue()[idx + 4:]) |
| 55 | |
| 56 | def parse_chunked(self, unreader): |
| 57 | (size, rest) = self.parse_chunk_size(unreader) |
| 58 | while size > 0: |
| 59 | while size > len(rest): |
| 60 | size -= len(rest) |
| 61 | yield rest |
| 62 | rest = unreader.read() |
| 63 | if not rest: |
| 64 | raise NoMoreData() |
| 65 | yield rest[:size] |
| 66 | # Remove \r\n after chunk |
| 67 | rest = rest[size:] |
| 68 | while len(rest) < 2: |
| 69 | new_data = unreader.read() |
| 70 | if not new_data: |
| 71 | break |
| 72 | rest += new_data |
| 73 | if rest[:2] != b'\r\n': |
| 74 | raise ChunkMissingTerminator(rest[:2]) |
| 75 | (size, rest) = self.parse_chunk_size(unreader, data=rest[2:]) |
| 76 | |
| 77 | def parse_chunk_size(self, unreader, data=None): |
| 78 | buf = io.BytesIO() |
no test coverage detected