(length: int)
| 186 | |
| 187 | @staticmethod |
| 188 | async def get_async_iterable_with_delays(length: int) -> AsyncGenerator[int, None]: |
| 189 | class="cm"># simulate a callback with delays between some of the results |
| 190 | from twisted.internet import reactor |
| 191 | |
| 192 | for i in range(length): |
| 193 | if random.random() < 0.1: |
| 194 | dfd: Deferred[None] = Deferred() |
| 195 | delay = random.random() / 20 |
| 196 | reactor.callLater(delay, dfd.callback, None) |
| 197 | await maybe_deferred_to_future(dfd) |
| 198 | yield i |
| 199 | |
| 200 | @inline_callbacks_test |
| 201 | def test_simple(self): |
no test coverage detected