(stream: t.IO[t.Any])
| 189 | |
| 190 | |
| 191 | def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None: |
| 192 | # We need to figure out if the given stream is already binary. |
| 193 | # This can happen because the official docs recommend detaching |
| 194 | # the streams to get binary streams. Some code might do this, so |
| 195 | # we need to deal with this case explicitly. |
| 196 | if _is_binary_writer(stream, False): |
| 197 | return t.cast(t.BinaryIO, stream) |
| 198 | |
| 199 | buf = getattr(stream, "buffer", None) |
| 200 | |
| 201 | # Same situation here; this time we assume that the buffer is |
| 202 | # actually binary in case it's closed. |
| 203 | if buf is not None and _is_binary_writer(buf, True): |
| 204 | return t.cast(t.BinaryIO, buf) |
| 205 | |
| 206 | return None |
| 207 | |
| 208 | |
| 209 | def _stream_is_misconfigured(stream: t.TextIO) -> bool: |
no test coverage detected