query.with_lockmode performs a 'version check' on an already loaded instance
(self)
| 346 | @provision.allow_stale_updates |
| 347 | @engines.close_open_connections |
| 348 | def test_versioncheck(self): |
| 349 | """query.with_lockmode performs a 'version check' on an already loaded |
| 350 | instance""" |
| 351 | |
| 352 | Foo = self.classes.Foo |
| 353 | |
| 354 | s1 = self._fixture() |
| 355 | f1s1 = Foo(value="f1 value") |
| 356 | s1.add(f1s1) |
| 357 | s1.commit() |
| 358 | |
| 359 | s2 = fixture_session() |
| 360 | f1s2 = s2.get(Foo, f1s1.id) |
| 361 | f1s2.value = "f1 new value" |
| 362 | with conditional_sane_rowcount_warnings(update=True): |
| 363 | s2.commit() |
| 364 | |
| 365 | # load, version is wrong |
| 366 | assert_raises_message( |
| 367 | sa.orm.exc.StaleDataError, |
| 368 | r"Instance .* has version id '\d+' which does not " |
| 369 | r"match database-loaded version id '\d+'", |
| 370 | s1.get, |
| 371 | Foo, |
| 372 | f1s1.id, |
| 373 | with_for_update=dict(read=True), |
| 374 | ) |
| 375 | |
| 376 | # reload it - this expires the old version first |
| 377 | s1.refresh(f1s1, with_for_update={"read": True}) |
| 378 | |
| 379 | # now assert version OK |
| 380 | s1.get(Foo, f1s1.id, with_for_update=dict(read=True)) |
| 381 | |
| 382 | # assert brand new load is OK too |
| 383 | s1.close() |
| 384 | s1.get(Foo, f1s1.id, with_for_update=dict(read=True)) |
| 385 | |
| 386 | def test_versioncheck_not_versioned(self): |
| 387 | """ensure the versioncheck logic skips if there isn't a |
nothing calls this directly
no test coverage detected