MCPcopy
hub / github.com/aio-libs/aiohttp / Stream

Class Stream

tests/test_multipart.py:63–80  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

61
62
63class Stream(object):
64
65 def __init__(self, content):
66 self.content = io.BytesIO(content)
67
68 @asyncio.coroutine
69 def read(self, size=None):
70 return self.content.read(size)
71
72 def at_eof(self):
73 return self.content.tell() == len(self.content.getbuffer())
74
75 @asyncio.coroutine
76 def readline(self):
77 return self.content.readline()
78
79 def unread_data(self, data):
80 self.content = io.BytesIO(data + self.content.read())
81
82
83class StreamWithShortenRead(Stream):

Calls

no outgoing calls