MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / chunks

Function chunks

lib/sqlalchemy/orm/loading.py:214–267  ·  view source on GitHub ↗
(size)

Source from the content-addressed store, hash-verified

212 )
213
214 def chunks(size): # type: ignore
215 while True:
216 yield_per = size
217
218 context.partials = {}
219
220 if yield_per:
221 if _uniquing_is_active:
222 raise sa_exc.InvalidRequestError(
223 "Can't use the ORM yield_per feature "
224 "in conjunction with unique()"
225 )
226 fetch = cursor.fetchmany(yield_per)
227
228 if not fetch:
229 break
230 else:
231 fetch = cursor._raw_all_tuples()
232
233 if single_entity:
234 proc = process[0]
235 rows = [proc(row) for row in fetch]
236 else:
237 rows = [
238 tuple([proc(row) for proc in process]) for row in fetch
239 ]
240
241 # if we are the originating load from a query, meaning we
242 # aren't being called as a result of a nested "post load",
243 # iterate through all the collected post loaders and fire them
244 # off. Previously this used to work recursively, however that
245 # prevented deeply nested structures from being loadable
246 if is_top_level:
247 if yield_per:
248 # if using yield per, memoize the state of the
249 # collection so that it can be restored
250 top_level_post_loads = list(
251 context.post_load_paths.items()
252 )
253
254 while context.post_load_paths:
255 post_loads = list(context.post_load_paths.items())
256 context.post_load_paths.clear()
257 for path, post_load in post_loads:
258 post_load.invoke(context, path)
259
260 if yield_per:
261 context.post_load_paths.clear()
262 context.post_load_paths.update(top_level_post_loads)
263
264 yield rows
265
266 if not yield_per:
267 break
268
269 if context.execution_options.get("prebuffer_rows", False):
270 # this is a bit of a hack at the moment.

Callers 1

instancesFunction · 0.85

Calls 6

_raw_all_tuplesMethod · 0.80
invokeMethod · 0.80
fetchmanyMethod · 0.45
itemsMethod · 0.45
clearMethod · 0.45
updateMethod · 0.45

Tested by

no test coverage detected