()
| 49 | |
| 50 | |
| 51 | async def main(): |
| 52 | q = queues.Queue() |
| 53 | start = time.time() |
| 54 | fetching, fetched, dead = set(), set(), set() |
| 55 | |
| 56 | async def fetch_url(current_url): |
| 57 | if current_url in fetching: |
| 58 | return |
| 59 | |
| 60 | print("fetching %s" % current_url) |
| 61 | fetching.add(current_url) |
| 62 | urls = await get_links_from_url(current_url) |
| 63 | fetched.add(current_url) |
| 64 | |
| 65 | for new_url in urls: |
| 66 | # Only follow links beneath the base URL |
| 67 | if new_url.startswith(base_url): |
| 68 | await q.put(new_url) |
| 69 | |
| 70 | async def worker(): |
| 71 | async for url in q: |
| 72 | if url is None: |
| 73 | return |
| 74 | try: |
| 75 | await fetch_url(url) |
| 76 | except Exception as e: |
| 77 | print(f"Exception: {e} {url}") |
| 78 | dead.add(url) |
| 79 | finally: |
| 80 | q.task_done() |
| 81 | |
| 82 | await q.put(base_url) |
| 83 | |
| 84 | # Start workers, then wait for the work queue to be empty. |
| 85 | workers = gen.multi([worker() for _ in range(concurrency)]) |
| 86 | await q.join(timeout=timedelta(seconds=300)) |
| 87 | assert fetching == (fetched | dead) |
| 88 | print("Done in %d seconds, fetched %s URLs." % (time.time() - start, len(fetched))) |
| 89 | print("Unable to fetch %s URLs." % len(dead)) |
| 90 | |
| 91 | # Signal all the workers to exit. |
| 92 | for _ in range(concurrency): |
| 93 | await q.put(None) |
| 94 | await workers |
| 95 | |
| 96 | |
| 97 | if __name__ == "__main__": |
no test coverage detected