MCPcopy
hub / github.com/Textualize/rich / _TrackThread

Class _TrackThread

rich/progress.py:64–101  ·  view source on GitHub ↗

A thread to periodically update progress.

Source from the content-addressed store, hash-verified

62
63
64class _TrackThread(Thread):
65 """A thread to periodically update progress."""
66
67 def __init__(self, progress: "Progress", task_id: "TaskID", update_period: float):
68 self.progress = progress
69 self.task_id = task_id
70 self.update_period = update_period
71 self.done = Event()
72
73 self.completed = 0
74 super().__init__(daemon=True)
75
76 def run(self) -> None:
77 task_id = self.task_id
78 advance = self.progress.advance
79 update_period = self.update_period
80 last_completed = 0
81 wait = self.done.wait
82 while not wait(update_period) and self.progress.live.is_started:
83 completed = self.completed
84 if last_completed != completed:
85 advance(task_id, completed - last_completed)
86 last_completed = completed
87
88 self.progress.update(self.task_id, completed=self.completed, refresh=True)
89
90 def __enter__(self) -> "_TrackThread":
91 self.start()
92 return self
93
94 def __exit__(
95 self,
96 exc_type: Optional[Type[BaseException]],
97 exc_val: Optional[BaseException],
98 exc_tb: Optional[TracebackType],
99 ) -> None:
100 self.done.set()
101 self.join()
102
103
104def track(

Callers 2

test_track_threadFunction · 0.90
trackMethod · 0.85

Calls

no outgoing calls

Tested by 1

test_track_threadFunction · 0.72