(self)
| 235 | self.__writeLine(str(input).replace("\n", "").replace("\r", "")) |
| 236 | |
| 237 | def getresponse(self): |
| 238 | res = self.__cnx.getresponse() |
| 239 | |
| 240 | status = res.status |
| 241 | headers = res.getheaders() |
| 242 | output = res if self.__stream else res.read() |
| 243 | |
| 244 | self.__writeLine(status) |
| 245 | self.__writeLine(list(headers)) |
| 246 | if self.__stream: |
| 247 | chunks = [chunk for chunk in output.iter_content(chunk_size=64)] |
| 248 | output = b"".join(chunks) |
| 249 | for chunk in chunks: |
| 250 | self.__writeLine(base64.b64encode(chunk).decode("ascii")) |
| 251 | self.__writeLine("") |
| 252 | else: |
| 253 | self.__writeLine(output) |
| 254 | self.__writeLine("") |
| 255 | |
| 256 | self.addRequest(self.__request.with_response(status, dict(headers), output)) |
| 257 | self.__request = None |
| 258 | |
| 259 | return FakeHttpResponse(status, headers, output) |
| 260 | |
| 261 | def close(self): |
| 262 | return self.__cnx.close() |
no test coverage detected