(
text_stream: t.IO[t.Any],
encoding: str | None,
errors: str | None,
is_binary: t.Callable[[t.IO[t.Any], bool], bool],
find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None],
force_readable: bool = False,
force_writable: bool = False,
)
| 236 | |
| 237 | |
| 238 | def _force_correct_text_stream( |
| 239 | text_stream: t.IO[t.Any], |
| 240 | encoding: str | None, |
| 241 | errors: str | None, |
| 242 | is_binary: t.Callable[[t.IO[t.Any], bool], bool], |
| 243 | find_binary: t.Callable[[t.IO[t.Any]], t.BinaryIO | None], |
| 244 | force_readable: bool = False, |
| 245 | force_writable: bool = False, |
| 246 | ) -> t.TextIO: |
| 247 | if is_binary(text_stream, False): |
| 248 | binary_reader = t.cast(t.BinaryIO, text_stream) |
| 249 | else: |
| 250 | text_stream = t.cast(t.TextIO, text_stream) |
| 251 | # If the stream looks compatible, and won't default to a |
| 252 | # misconfigured ascii encoding, return it as-is. |
| 253 | if _is_compatible_text_stream(text_stream, encoding, errors) and not ( |
| 254 | encoding is None and _stream_is_misconfigured(text_stream) |
| 255 | ): |
| 256 | return text_stream |
| 257 | |
| 258 | # Otherwise, get the underlying binary reader. |
| 259 | possible_binary_reader = find_binary(text_stream) |
| 260 | |
| 261 | # If that's not possible, silently use the original reader |
| 262 | # and get mojibake instead of exceptions. |
| 263 | if possible_binary_reader is None: |
| 264 | return text_stream |
| 265 | |
| 266 | binary_reader = possible_binary_reader |
| 267 | |
| 268 | # Default errors to replace instead of strict in order to get |
| 269 | # something that works. |
| 270 | if errors is None: |
| 271 | errors = "replace" |
| 272 | |
| 273 | # Wrap the binary stream in a text stream with the correct |
| 274 | # encoding parameters. |
| 275 | return _make_text_stream( |
| 276 | binary_reader, |
| 277 | encoding, |
| 278 | errors, |
| 279 | force_readable=force_readable, |
| 280 | force_writable=force_writable, |
| 281 | ) |
| 282 | |
| 283 | |
| 284 | def _force_correct_text_reader( |
no test coverage detected
searching dependent graphs…