| 171 | |
| 172 | |
| 173 | def _find_binary_reader(stream: t.IO[t.Any]) -> t.BinaryIO | None: |
| 174 | # We need to figure out if the given stream is already binary. |
| 175 | # This can happen because the official docs recommend detaching |
| 176 | # the streams to get binary streams. Some code might do this, so |
| 177 | # we need to deal with this case explicitly. |
| 178 | if _is_binary_reader(stream, False): |
| 179 | return t.cast(t.BinaryIO, stream) |
| 180 | |
| 181 | buf = getattr(stream, "buffer", None) |
| 182 | |
| 183 | # Same situation here; this time we assume that the buffer is |
| 184 | # actually binary in case it's closed. |
| 185 | if buf is not None and _is_binary_reader(buf, True): |
| 186 | return t.cast(t.BinaryIO, buf) |
| 187 | |
| 188 | return None |
| 189 | |
| 190 | |
| 191 | def _find_binary_writer(stream: t.IO[t.Any]) -> t.BinaryIO | None: |