(self, response, body=None,
headers=None, chunked=False, write_body=None)
| 208 | return aiohttp.Response(self._srv.writer, code) |
| 209 | |
| 210 | def _response(self, response, body=None, |
| 211 | headers=None, chunked=False, write_body=None): |
| 212 | r_headers = {} |
| 213 | for key, val in self._headers.items(): |
| 214 | key = '-'.join(p.capitalize() for p in key.split('-')) |
| 215 | r_headers[key] = val |
| 216 | |
| 217 | encoding = self._headers.get('content-encoding', '').lower() |
| 218 | if 'gzip' in encoding: # pragma: no cover |
| 219 | cmod = 'gzip' |
| 220 | elif 'deflate' in encoding: |
| 221 | cmod = 'deflate' |
| 222 | else: |
| 223 | cmod = '' |
| 224 | |
| 225 | resp = { |
| 226 | 'method': self._method, |
| 227 | 'version': '%s.%s' % self._version, |
| 228 | 'path': self._uri, |
| 229 | 'headers': r_headers, |
| 230 | 'origin': self._transport.get_extra_info('addr', ' ')[0], |
| 231 | 'query': self._query, |
| 232 | 'form': {}, |
| 233 | 'compression': cmod, |
| 234 | 'multipart-data': [] |
| 235 | } |
| 236 | if body: # pragma: no cover |
| 237 | resp['content'] = body |
| 238 | else: |
| 239 | resp['content'] = self._body.decode('utf-8', 'ignore') |
| 240 | |
| 241 | ct = self._headers.get('content-type', '').lower() |
| 242 | |
| 243 | # application/x-www-form-urlencoded |
| 244 | if ct == 'application/x-www-form-urlencoded': |
| 245 | resp['form'] = urllib.parse.parse_qs(self._body.decode('latin1')) |
| 246 | |
| 247 | # multipart/form-data |
| 248 | elif ct.startswith('multipart/form-data'): # pragma: no cover |
| 249 | out = io.BytesIO() |
| 250 | for key, val in self._headers.items(): |
| 251 | out.write(bytes('{}: {}\r\n'.format(key, val), 'latin1')) |
| 252 | |
| 253 | out.write(b'\r\n') |
| 254 | out.write(self._body) |
| 255 | out.write(b'\r\n') |
| 256 | out.seek(0) |
| 257 | |
| 258 | message = email.parser.BytesParser().parse(out) |
| 259 | if message.is_multipart(): |
| 260 | for msg in message.get_payload(): |
| 261 | if msg.is_multipart(): |
| 262 | logging.warning('multipart msg is not expected') |
| 263 | else: |
| 264 | key, params = cgi.parse_header( |
| 265 | msg.get('content-disposition', '')) |
| 266 | params['data'] = msg.get_payload() |
| 267 | params['content-type'] = msg.get_content_type() |
no test coverage detected