(self, headers)
| 340 | return self.write |
| 341 | |
| 342 | def process_headers(self, headers): |
| 343 | for name, value in headers: |
| 344 | if not isinstance(name, str): |
| 345 | raise TypeError('%r is not a string' % name) |
| 346 | |
| 347 | if not TOKEN_RE.fullmatch(name): |
| 348 | raise InvalidHeaderName('%r' % name) |
| 349 | |
| 350 | if not isinstance(value, str): |
| 351 | raise TypeError('%r is not a string' % value) |
| 352 | |
| 353 | if not HEADER_VALUE_RE.fullmatch(value): |
| 354 | raise InvalidHeader('%r' % value) |
| 355 | |
| 356 | # RFC9110 5.5 |
| 357 | value = value.strip(" \t") |
| 358 | lname = name.lower() |
| 359 | if lname == "content-length": |
| 360 | self.response_length = int(value) |
| 361 | elif util.is_hoppish(name): |
| 362 | if lname == "connection": |
| 363 | # handle websocket |
| 364 | if value.lower() == "upgrade": |
| 365 | self.upgrade = True |
| 366 | elif lname == "upgrade": |
| 367 | if value.lower() == "websocket": |
| 368 | self.headers.append((name, value)) |
| 369 | |
| 370 | # ignore hopbyhop headers |
| 371 | continue |
| 372 | self.headers.append((name, value)) |
| 373 | |
| 374 | def is_chunked(self): |
| 375 | # Only use chunked responses when the client is |
no test coverage detected