Split by finding all regex matches across all inputs.
(self, inputs: List[Message])
| 72 | self.on_no_match = on_no_match |
| 73 | |
| 74 | def split(self, inputs: List[Message]) -> List[List[Message]]: |
| 75 | """Split by finding all regex matches across all inputs.""" |
| 76 | units: List[List[Message]] = [] |
| 77 | |
| 78 | for msg in inputs: |
| 79 | text = msg.text_content() |
| 80 | |
| 81 | # Find all matches |
| 82 | matches = list(self.pattern.finditer(text)) |
| 83 | |
| 84 | if not matches: |
| 85 | # Handle no match case |
| 86 | if self.on_no_match == "pass": |
| 87 | units.append([msg]) |
| 88 | elif self.on_no_match == "empty": |
| 89 | # Return empty content |
| 90 | unit_msg = Message( |
| 91 | role=msg.role, |
| 92 | content="", |
| 93 | metadata={**msg.metadata, "split_source": "regex", "split_no_match": True}, |
| 94 | ) |
| 95 | units.append([unit_msg]) |
| 96 | continue |
| 97 | |
| 98 | for match in matches: |
| 99 | # Extract the appropriate group |
| 100 | if self.group is not None: |
| 101 | try: |
| 102 | match_text = match.group(self.group) |
| 103 | except (IndexError, re.error): |
| 104 | match_text = match.group(0) |
| 105 | else: |
| 106 | match_text = match.group(0) |
| 107 | |
| 108 | if match_text is None: |
| 109 | match_text = "" |
| 110 | |
| 111 | unit_msg = Message( |
| 112 | role=msg.role, |
| 113 | content=match_text, |
| 114 | metadata={**msg.metadata, "split_source": "regex"}, |
| 115 | ) |
| 116 | units.append([unit_msg]) |
| 117 | |
| 118 | return units if units else [[msg] for msg in inputs] |
| 119 | |
| 120 | |
| 121 | class JsonPathSplitter(Splitter): |
nothing calls this directly
no test coverage detected