MCPcopy
hub / github.com/pallets/werkzeug / stream_encode_multipart

Function stream_encode_multipart

src/werkzeug/test.py:54–141  ·  view source on GitHub ↗

Encode a dict of values (either strings or file descriptors or :class:`FileStorage` objects.) into a multipart encoded string stored in a file descriptor. .. versionchanged:: 3.0 The ``charset`` parameter was removed.

(
    data: t.Mapping[str, t.Any],
    use_tempfile: bool = True,
    threshold: int = 1024 * 500,
    boundary: str | None = None,
)

Source from the content-addressed store, hash-verified

52
53
54def stream_encode_multipart(
55 data: t.Mapping[str, t.Any],
56 use_tempfile: bool = True,
57 threshold: int = 1024 * 500,
58 boundary: str | None = None,
59) -> tuple[t.IO[bytes], int, str]:
60 """Encode a dict of values (either strings or file descriptors or
61 :class:`FileStorage` objects.) into a multipart encoded string stored
62 in a file descriptor.
63
64 .. versionchanged:: 3.0
65 The ``charset`` parameter was removed.
66 """
67 if boundary is None:
68 boundary = f"---------------WerkzeugFormPart_{time()}{random()}"
69
70 stream: t.IO[bytes] = BytesIO()
71 total_length = 0
72 on_disk = False
73 write_binary: t.Callable[[bytes], int]
74
75 if use_tempfile:
76
77 def write_binary(s: bytes) -> int:
78 nonlocal stream, total_length, on_disk
79
80 if on_disk:
81 return stream.write(s)
82 else:
83 length = len(s)
84
85 if length + total_length <= threshold:
86 stream.write(s)
87 else:
88 new_stream = t.cast(t.IO[bytes], TemporaryFile("wb+"))
89 new_stream.write(stream.getvalue()) # type: ignore
90 new_stream.write(s)
91 stream = new_stream
92 on_disk = True
93
94 total_length += length
95 return length
96
97 else:
98 write_binary = stream.write
99
100 encoder = MultipartEncoder(boundary.encode())
101 write_binary(encoder.send_event(Preamble(data=b"")))
102 for key, value in _iter_data(data):
103 reader = getattr(value, "read", None)
104 if reader is not None:
105 filename = getattr(value, "filename", getattr(value, "name", None))
106 content_type = getattr(value, "content_type", None)
107 if content_type is None:
108 content_type = (
109 filename
110 and mimetypes.guess_type(filename)[0]
111 or "application/octet-stream"

Callers 6

test_chunked_requestFunction · 0.90
encode_multipartFunction · 0.85
get_environMethod · 0.85

Calls 13

send_eventMethod · 0.95
MultipartEncoderClass · 0.85
write_binaryFunction · 0.85
PreambleClass · 0.85
_iter_dataFunction · 0.85
FieldClass · 0.85
FileClass · 0.85
DataClass · 0.85
HeadersClass · 0.85
EpilogueClass · 0.85
updateMethod · 0.45
tellMethod · 0.45

Tested by

no test coverage detected