MCPcopy
hub / github.com/scrapy/scrapy / run_signaled_feed_exporter

Method run_signaled_feed_exporter

tests/test_feedexport.py:1314–1333  ·  view source on GitHub ↗
(
        self,
        feed_exporter_signal_handler: Callable[[], Awaitable[None] | None],
        feed_slot_signal_handler: Callable[[Any], Awaitable[None] | None],
    )

Source from the content-addressed store, hash-verified

1312 self.feed_slot_closed_received = True
1313
1314 async def run_signaled_feed_exporter(
1315 self,
1316 feed_exporter_signal_handler: Callable[[], Awaitable[None] | None],
1317 feed_slot_signal_handler: Callable[[Any], Awaitable[None] | None],
1318 ) -> None:
1319 crawler = get_crawler(settings_dict=self.settings)
1320 feed_exporter = FeedExporter.from_crawler(crawler)
1321 spider = scrapy.Spider("default")
1322 spider.crawler = crawler
1323 crawler.signals.connect(
1324 feed_exporter_signal_handler,
1325 signal=signals.feed_exporter_closed,
1326 )
1327 crawler.signals.connect(
1328 feed_slot_signal_handler, signal=signals.feed_slot_closed
1329 )
1330 feed_exporter.open_spider(spider)
1331 for item in self.items:
1332 feed_exporter.item_scraped(item, spider)
1333 await feed_exporter.close_spider(spider)
1334
1335 @coroutine_test
1336 async def test_feed_exporter_signals_sent(self) -> None:

Calls 6

get_crawlerFunction · 0.90
from_crawlerMethod · 0.45
connectMethod · 0.45
open_spiderMethod · 0.45
item_scrapedMethod · 0.45
close_spiderMethod · 0.45

Tested by

no test coverage detected