(self, items, rows, settings=None)
| 123 | assert got_batch == expected_batch |
| 124 | |
| 125 | async def assertExportedMultiple(self, items, rows, settings=None): |
| 126 | settings = settings or {} |
| 127 | settings.update( |
| 128 | { |
| 129 | "FEEDS": { |
| 130 | self._random_temp_filename() / "xml" / self._file_mark: { |
| 131 | "format": "xml" |
| 132 | }, |
| 133 | self._random_temp_filename() / "json" / self._file_mark: { |
| 134 | "format": "json" |
| 135 | }, |
| 136 | }, |
| 137 | } |
| 138 | ) |
| 139 | batch_size = Settings(settings).getint("FEED_EXPORT_BATCH_ITEM_COUNT") |
| 140 | rows = [{k: v for k, v in row.items() if v} for row in rows] |
| 141 | data = await self.exported_data(items, settings) |
| 142 | # XML |
| 143 | xml_rows = rows.copy() |
| 144 | for batch in data["xml"]: |
| 145 | root = lxml.etree.fromstring(batch) |
| 146 | got_batch = [{e.tag: e.text for e in it} for it in root.findall("item")] |
| 147 | expected_batch, xml_rows = xml_rows[:batch_size], xml_rows[batch_size:] |
| 148 | assert got_batch == expected_batch |
| 149 | # JSON |
| 150 | json_rows = rows.copy() |
| 151 | for batch in data["json"]: |
| 152 | got_batch = json.loads(batch.decode("utf-8")) |
| 153 | expected_batch, json_rows = json_rows[:batch_size], json_rows[batch_size:] |
| 154 | assert got_batch == expected_batch |
| 155 | |
| 156 | async def assertExportedPickle(self, items, rows, settings=None): |
| 157 | settings = settings or {} |
nothing calls this directly
no test coverage detected