Encode a dict of values (either strings or file descriptors or :class:`FileStorage` objects.) into a multipart encoded string stored in a file descriptor. .. versionchanged:: 3.0 The ``charset`` parameter was removed.
(
data: t.Mapping[str, t.Any],
use_tempfile: bool = True,
threshold: int = 1024 * 500,
boundary: str | None = None,
)
| 52 | |
| 53 | |
| 54 | def stream_encode_multipart( |
| 55 | data: t.Mapping[str, t.Any], |
| 56 | use_tempfile: bool = True, |
| 57 | threshold: int = 1024 * 500, |
| 58 | boundary: str | None = None, |
| 59 | ) -> tuple[t.IO[bytes], int, str]: |
| 60 | """Encode a dict of values (either strings or file descriptors or |
| 61 | :class:`FileStorage` objects.) into a multipart encoded string stored |
| 62 | in a file descriptor. |
| 63 | |
| 64 | .. versionchanged:: 3.0 |
| 65 | The ``charset`` parameter was removed. |
| 66 | """ |
| 67 | if boundary is None: |
| 68 | boundary = f"---------------WerkzeugFormPart_{time()}{random()}" |
| 69 | |
| 70 | stream: t.IO[bytes] = BytesIO() |
| 71 | total_length = 0 |
| 72 | on_disk = False |
| 73 | write_binary: t.Callable[[bytes], int] |
| 74 | |
| 75 | if use_tempfile: |
| 76 | |
| 77 | def write_binary(s: bytes) -> int: |
| 78 | nonlocal stream, total_length, on_disk |
| 79 | |
| 80 | if on_disk: |
| 81 | return stream.write(s) |
| 82 | else: |
| 83 | length = len(s) |
| 84 | |
| 85 | if length + total_length <= threshold: |
| 86 | stream.write(s) |
| 87 | else: |
| 88 | new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+")) |
| 89 | new_stream.write(stream.getvalue()) # type: ignore |
| 90 | new_stream.write(s) |
| 91 | stream = new_stream |
| 92 | on_disk = True |
| 93 | |
| 94 | total_length += length |
| 95 | return length |
| 96 | |
| 97 | else: |
| 98 | write_binary = stream.write |
| 99 | |
| 100 | encoder = MultipartEncoder(boundary.encode()) |
| 101 | write_binary(encoder.send_event(Preamble(data=b""))) |
| 102 | for key, value in _iter_data(data): |
| 103 | reader = getattr(value, "read", None) |
| 104 | if reader is not None: |
| 105 | filename = getattr(value, "filename", getattr(value, "name", None)) |
| 106 | content_type = getattr(value, "content_type", None) |
| 107 | if content_type is None: |
| 108 | content_type = ( |
| 109 | filename |
| 110 | and mimetypes.guess_type(filename)[0] |
| 111 | or "application/octet-stream" |
no test coverage detected