MCPcopy
hub / github.com/urllib3/urllib3 / body_to_chunks

Function body_to_chunks

src/urllib3/util/request.py:197–263  ·  view source on GitHub ↗

Takes the HTTP request method, body, and blocksize and transforms them into an iterable of chunks to pass to socket.sendall() and an optional 'Content-Length' header. A 'Content-Length' of 'None' indicates the length of the body can't be determined so should use 'Transfer-Encoding:

(
    body: typing.Any | None, method: str, blocksize: int
)

Source from the content-addressed store, hash-verified

195
196
197def body_to_chunks(
198 body: typing.Any | None, method: str, blocksize: int
199) -> ChunksAndContentLength:
200 """Takes the HTTP request method, body, and blocksize and
201 transforms them into an iterable of chunks to pass to
202 socket.sendall() and an optional 'Content-Length' header.
203
204 A 'Content-Length' of 'None' indicates the length of the body
205 can't be determined so should use 'Transfer-Encoding: chunked'
206 for framing instead.
207 """
208
209 chunks: typing.Iterable[bytes] | None
210 content_length: int | None
211
212 # No body, we need to make a recommendation on 'Content-Length'
213 # based on whether that request method is expected to have
214 # a body or not.
215 if body is None:
216 chunks = None
217 if method.upper() not in _METHODS_NOT_EXPECTING_BODY:
218 content_length = 0
219 else:
220 content_length = None
221
222 # Bytes or strings become bytes
223 elif isinstance(body, (str, bytes)):
224 chunks = (to_bytes(body),)
225 content_length = len(chunks[0])
226
227 # File-like object, TODO: use seek() and tell() for length?
228 elif hasattr(body, "read"):
229
230 def chunk_readable() -> typing.Iterable[bytes]:
231 encode = isinstance(body, io.TextIOBase)
232 while True:
233 datablock = body.read(blocksize)
234 if not datablock:
235 break
236 if encode:
237 datablock = datablock.encode("utf-8")
238 yield datablock
239
240 chunks = chunk_readable()
241 content_length = None
242
243 # Otherwise we need to start checking via duck-typing.
244 else:
245 try:
246 # Check if the body implements the buffer API.
247 mv = memoryview(body)
248 except TypeError:
249 try:
250 # Check if the body is an iterable
251 chunks = iter(body)
252 content_length = None
253 except TypeError:
254 raise TypeError(

Callers 1

requestMethod · 0.85

Calls 3

to_bytesFunction · 0.85
chunk_readableFunction · 0.85

Tested by

no test coverage detected