Copy data from a url to a local file.
(task_id: TaskID, url: str, path: str)
| 45 | |
| 46 | |
| 47 | def copy_url(task_id: TaskID, url: str, path: str) -> None: |
| 48 | """Copy data from a url to a local file.""" |
| 49 | progress.console.log(f"Requesting {url}") |
| 50 | response = urlopen(url) |
| 51 | # This will break if the response doesn't contain content length |
| 52 | progress.update(task_id, total=int(response.info()["Content-length"])) |
| 53 | with open(path, "wb") as dest_file: |
| 54 | progress.start_task(task_id) |
| 55 | for data in iter(partial(response.read, 32768), b""): |
| 56 | dest_file.write(data) |
| 57 | progress.update(task_id, advance=len(data)) |
| 58 | if done_event.is_set(): |
| 59 | return |
| 60 | progress.console.log(f"Downloaded {path}") |
| 61 | |
| 62 | |
| 63 | def download(urls: Iterable[str], dest_dir: str): |
nothing calls this directly
no test coverage detected