Get prompts that are already cached. Async version.
(
params: Dict[str, Any],
prompts: List[str],
cache: Optional[Union[BaseCache, bool, None]] = None,
)
| 160 | |
| 161 | |
| 162 | async def aget_prompts( |
| 163 | params: Dict[str, Any], |
| 164 | prompts: List[str], |
| 165 | cache: Optional[Union[BaseCache, bool, None]] = None, |
| 166 | ) -> Tuple[Dict[int, List], str, List[int], List[str]]: |
| 167 | """Get prompts that are already cached. Async version.""" |
| 168 | llm_string = str(sorted([(k, v) for k, v in params.items()])) |
| 169 | missing_prompts = [] |
| 170 | missing_prompt_idxs = [] |
| 171 | existing_prompts = {} |
| 172 | llm_cache = _resolve_cache(cache) |
| 173 | for i, prompt in enumerate(prompts): |
| 174 | if llm_cache: |
| 175 | cache_val = await llm_cache.alookup(prompt, llm_string) |
| 176 | if isinstance(cache_val, list): |
| 177 | existing_prompts[i] = cache_val |
| 178 | else: |
| 179 | missing_prompts.append(prompt) |
| 180 | missing_prompt_idxs.append(i) |
| 181 | return existing_prompts, llm_string, missing_prompt_idxs, missing_prompts |
| 182 | |
| 183 | |
| 184 | def update_cache( |
no test coverage detected