MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / SQLAsserter

Class SQLAsserter

lib/sqlalchemy/testing/assertsql.py:452–474  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

450
451
452class SQLAsserter:
453 def __init__(self):
454 self.accumulated = []
455
456 def _close(self):
457 self._final = self.accumulated
458 del self.accumulated
459
460 def assert_(self, *rules):
461 rule = EachOf(*rules)
462
463 observed = list(self._final)
464 while observed:
465 statement = observed.pop(0)
466 rule.process_statement(statement)
467 if rule.is_consumed:
468 break
469 elif rule.errormessage:
470 assert False, rule.errormessage
471 if observed:
472 assert False, "Additional SQL statements remain:\n%s" % observed
473 elif not rule.is_consumed:
474 rule.no_more_statements()
475
476
477@contextlib.contextmanager

Callers 1

assert_engineFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected