(
self,
feed_exporter_signal_handler: Callable[[], Awaitable[None] | None],
feed_slot_signal_handler: Callable[[Any], Awaitable[None] | None],
)
| 1312 | self.feed_slot_closed_received = True |
| 1313 | |
| 1314 | async def run_signaled_feed_exporter( |
| 1315 | self, |
| 1316 | feed_exporter_signal_handler: Callable[[], Awaitable[None] | None], |
| 1317 | feed_slot_signal_handler: Callable[[Any], Awaitable[None] | None], |
| 1318 | ) -> None: |
| 1319 | crawler = get_crawler(settings_dict=self.settings) |
| 1320 | feed_exporter = FeedExporter.from_crawler(crawler) |
| 1321 | spider = scrapy.Spider("default") |
| 1322 | spider.crawler = crawler |
| 1323 | crawler.signals.connect( |
| 1324 | feed_exporter_signal_handler, |
| 1325 | signal=signals.feed_exporter_closed, |
| 1326 | ) |
| 1327 | crawler.signals.connect( |
| 1328 | feed_slot_signal_handler, signal=signals.feed_slot_closed |
| 1329 | ) |
| 1330 | feed_exporter.open_spider(spider) |
| 1331 | for item in self.items: |
| 1332 | feed_exporter.item_scraped(item, spider) |
| 1333 | await feed_exporter.close_spider(spider) |
| 1334 | |
| 1335 | @coroutine_test |
| 1336 | async def test_feed_exporter_signals_sent(self) -> None: |
no test coverage detected