Wrap an inner generator, persisting nodes/relationships as they flow through.
(
inner: Generator[PipelineEvent, None, None],
store: Store,
)
| 27 | |
| 28 | |
| 29 | def saving( |
| 30 | inner: Generator[PipelineEvent, None, None], |
| 31 | store: Store, |
| 32 | ) -> Generator[PipelineEvent, None, None]: |
| 33 | """Wrap an inner generator, persisting nodes/relationships as they flow through.""" |
| 34 | nodes_saved = 0 |
| 35 | rels_saved = 0 |
| 36 | |
| 37 | for event in inner: |
| 38 | if event.nodes: |
| 39 | for node in event.nodes: |
| 40 | store.save_node(node) |
| 41 | nodes_saved += len(event.nodes) |
| 42 | if event.relationships: |
| 43 | for rel in event.relationships: |
| 44 | store.save_relationship(rel) |
| 45 | rels_saved += len(event.relationships) |
| 46 | yield event |
| 47 | |
| 48 | store.flush() |
| 49 | yield PipelineEvent( |
| 50 | kind=EventKind.STAGE_STOP, |
| 51 | phase=Phase.SUBMITTING, |
| 52 | message=f"Saved {nodes_saved} nodes, {rels_saved} relationships", |
| 53 | ) |