MCPcopy
hub / github.com/Textualize/rich / FileProxy

Class FileProxy

rich/file_proxy.py:11–60  ·  view source on GitHub ↗

Wraps a file (e.g. sys.stdout) and redirects writes to a console.

Source from the content-addressed store, hash-verified

9
10
11class FileProxy(io.TextIOBase):
12 """Wraps a file (e.g. sys.stdout) and redirects writes to a console."""
13
14 def __init__(self, console: "Console", file: IO[str]) -> None:
15 self.__console = console
16 self.__file = file
17 self.__buffer: List[str] = []
18 self.__ansi_decoder = AnsiDecoder()
19
20 @property
21 def rich_proxied_file(self) -> IO[str]:
22 """Get proxied file."""
23 return self.__file
24
25 def __getattr__(self, name: str) -> Any:
26 return getattr(self.__file, name)
27
28 def write(self, text: str) -> int:
29 if not isinstance(text, str):
30 raise TypeError(f"write() argument must be str, not {type(text).__name__}")
31 buffer = self.__buffer
32 lines: List[str] = []
33 while text:
34 line, new_line, text = text.partition("\n")
35 if new_line:
36 lines.append("".join(buffer) + line)
37 buffer.clear()
38 else:
39 buffer.append(line)
40 break
41 if lines:
42 console = self.__console
43 with console:
44 output = Text("\n").join(
45 self.__ansi_decoder.decode_line(line) for line in lines
46 )
47 console.print(output)
48 return len(text)
49
50 def flush(self) -> None:
51 output = "".join(self.__buffer)
52 if output:
53 self.__console.print(output)
54 del self.__buffer[:]
55
56 def fileno(self) -> int:
57 return self.__file.fileno()
58
59 def isatty(self) -> bool:
60 return self.__file.isatty()

Callers 5

test_empty_bytesFunction · 0.90
test_flushFunction · 0.90
test_new_linesFunction · 0.90
test_isattyFunction · 0.90
_enable_redirect_ioMethod · 0.85

Calls

no outgoing calls

Tested by 4

test_empty_bytesFunction · 0.72
test_flushFunction · 0.72
test_new_linesFunction · 0.72
test_isattyFunction · 0.72