MCPcopy Index your code
hub / github.com/PyGithub/PyGithub / getStream

Method getStream

github/Requester.py:1021–1050  ·  view source on GitHub ↗

GET a stream from the server. :returns:``(status, headers, stream_chunk_iterator)``

(
        self,
        url: str,
        parameters: dict[str, Any] | None = None,
        headers: dict[str, str] | None = None,
        cnx: HTTPRequestsConnectionClass | HTTPSRequestsConnectionClass | None = None,
        chunk_size: int | None | None = 1,
    )

Source from the content-addressed store, hash-verified

1019 f.write(chunk)
1020
1021 def getStream(
1022 self,
1023 url: str,
1024 parameters: dict[str, Any] | None = None,
1025 headers: dict[str, str] | None = None,
1026 cnx: HTTPRequestsConnectionClass | HTTPSRequestsConnectionClass | None = None,
1027 chunk_size: int | None | None = 1,
1028 ) -> tuple[int, dict[str, Any], Iterator]:
1029 """
1030 GET a stream from the server.
1031
1032 :returns:``(status, headers, stream_chunk_iterator)``
1033
1034 """
1035 if headers is None:
1036 headers = {}
1037 headers["Accept"] = "application/octet-stream"
1038
1039 def encode(_: Any) -> tuple[str, str]:
1040 return "", ""
1041
1042 status, responseHeaders, output = self.__requestEncode(
1043 cnx, "GET", url, parameters, headers, None, encode, stream=True, follow_302_redirect=True
1044 )
1045 if isinstance(output, RequestsResponse) or (
1046 hasattr(output, "iter_content") and hasattr(output, "raise_for_status")
1047 ):
1048 output.raise_for_status()
1049 return status, responseHeaders, output.iter_content(chunk_size=chunk_size)
1050 raise TypeError(f"Expected a RequestsResponse object: {type(output)}")
1051
1052 def requestJson(
1053 self,

Callers 2

getFileMethod · 0.95
download_assetMethod · 0.80

Calls 3

__requestEncodeMethod · 0.95
raise_for_statusMethod · 0.45
iter_contentMethod · 0.45

Tested by

no test coverage detected