Updates message in place to correct misparsed rfc2047 display-names in address headers caused by https://github.com/python/cpython/issues/128110.
(message, msg_bytes)
| 60 | |
| 61 | |
| 62 | def _apply_cpython_128110_workaround(message, msg_bytes): |
| 63 | """ |
| 64 | Updates message in place to correct misparsed rfc2047 display-names in |
| 65 | address headers caused by https://github.com/python/cpython/issues/128110. |
| 66 | """ |
| 67 | from email.header import decode_header |
| 68 | from email.headerregistry import AddressHeader |
| 69 | from email.parser import BytesHeaderParser |
| 70 | from email.utils import getaddresses |
| 71 | |
| 72 | def rfc2047_decode(s): |
| 73 | # Decode using legacy decode_header() (which doesn't have the bug). |
| 74 | return "".join( |
| 75 | ( |
| 76 | segment |
| 77 | if charset is None and isinstance(segment, str) |
| 78 | else segment.decode(charset or "ascii") |
| 79 | ) |
| 80 | for segment, charset in decode_header(s) |
| 81 | ) |
| 82 | |
| 83 | def build_address(name, address): |
| 84 | if "@" in address: |
| 85 | return Address(display_name=name, addr_spec=address) |
| 86 | return Address(display_name=name, username=address, domain="") |
| 87 | |
| 88 | # This workaround only applies to messages parsed with a modern policy. |
| 89 | assert not isinstance(message.policy, policy.Compat32) |
| 90 | |
| 91 | # Reparse with compat32 to get access to raw (undecoded) headers. |
| 92 | raw_headers = BytesHeaderParser(policy=policy.compat32).parsebytes(msg_bytes) |
| 93 | for header, modern_value in message.items(): |
| 94 | if not isinstance(modern_value, AddressHeader): |
| 95 | # The bug only affects structured address headers. |
| 96 | continue |
| 97 | raw_value = raw_headers[header] |
| 98 | if RFC2047_PREFIX in raw_value: |
| 99 | # Headers should not appear more than once. |
| 100 | assert len(message.get_all(header)) == 1 |
| 101 | # Reconstruct Address objects using legacy APIs. |
| 102 | unfolded = raw_value.replace("\r\n", "").replace("\n", "") |
| 103 | corrected_addresses = ( |
| 104 | build_address(rfc2047_decode(name), address) |
| 105 | for name, address in getaddresses([unfolded]) |
| 106 | ) |
| 107 | message.replace_header(header, corrected_addresses) |
| 108 | |
| 109 | |
| 110 | def message_from_bytes(s): |
no test coverage detected