MCPcopy
hub / github.com/encode/httpx / MultipartStream

Class MultipartStream

httpx/_multipart.py:224–300  ·  view source on GitHub ↗

Request content as streaming multipart encoded form data.

Source from the content-addressed store, hash-verified

222
223
224class MultipartStream(SyncByteStream, AsyncByteStream):
225 """
226 Request content as streaming multipart encoded form data.
227 """
228
229 def __init__(
230 self,
231 data: RequestData,
232 files: RequestFiles,
233 boundary: bytes | None = None,
234 ) -> None:
235 if boundary is None:
236 boundary = os.urandom(16).hex().encode("ascii")
237
238 self.boundary = boundary
239 self.content_type = "multipart/form-data; boundary=%s" % boundary.decode(
240 "ascii"
241 )
242 self.fields = list(self._iter_fields(data, files))
243
244 def _iter_fields(
245 self, data: RequestData, files: RequestFiles
246 ) -> typing.Iterator[FileField | DataField]:
247 for name, value in data.items():
248 if isinstance(value, (tuple, list)):
249 for item in value:
250 yield DataField(name=name, value=item)
251 else:
252 yield DataField(name=name, value=value)
253
254 file_items = files.items() if isinstance(files, typing.Mapping) else files
255 for name, value in file_items:
256 yield FileField(name=name, value=value)
257
258 def iter_chunks(self) -> typing.Iterator[bytes]:
259 for field in self.fields:
260 yield b"--%s\r\n" % self.boundary
261 yield from field.render()
262 yield b"\r\n"
263 yield b"--%s--\r\n" % self.boundary
264
265 def get_content_length(self) -> int | None:
266 """
267 Return the length of the multipart encoded content, or `None` if
268 any of the files have a length that cannot be determined upfront.
269 """
270 boundary_length = len(self.boundary)
271 length = 0
272
273 for field in self.fields:
274 field_length = field.get_length()
275 if field_length is None:
276 return None
277
278 length += 2 + boundary_length + 2 # b"--{boundary}\r\n"
279 length += field_length
280 length += 2 # b"\r\n"
281

Callers 1

encode_multipart_dataFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected