Test that a Connection object will call close() on the stream that it holds.
(request)
| 720 | |
| 721 | |
| 722 | async def test_connection_garbage_collection(request): |
| 723 | """ |
| 724 | Test that a Connection object will call close() on the |
| 725 | stream that it holds. |
| 726 | """ |
| 727 | |
| 728 | url: str = request.config.getoption("--redis-url") |
| 729 | pool = ConnectionPool.from_url(url) |
| 730 | |
| 731 | # create a client with a connection from the pool |
| 732 | client = Redis(connection_pool=pool, single_connection_client=True) |
| 733 | await client.initialize() |
| 734 | conn = client.connection |
| 735 | |
| 736 | with mock.patch.object(conn, "_reader"): |
| 737 | with mock.patch.object(conn, "_writer") as a: |
| 738 | # we cannot, in unittests, or from asyncio, reliably trigger |
| 739 | # garbage collection so we must just invoke the handler |
| 740 | with pytest.warns(ResourceWarning): |
| 741 | conn.__del__() |
| 742 | assert a.close.called |
| 743 | |
| 744 | await client.aclose() |
| 745 | await pool.aclose() |
| 746 | |
| 747 | |
| 748 | @pytest.mark.parametrize( |