MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / test_deferred

Method test_deferred

test/orm/test_attributes.py:456–503  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

454 eq_(f1.b, [])
455
456 def test_deferred(self):
457 class Foo:
458 pass
459
460 data = {"a": "this is a", "b": 12}
461
462 def loader(state, keys, passive):
463 for k in keys:
464 state.dict[k] = data[k]
465 return attributes.ATTR_WAS_SET
466
467 instrumentation.register_class(Foo)
468 manager = attributes.manager_of_class(Foo)
469 manager.expired_attribute_loader = loader
470 _register_attribute(Foo, "a", uselist=False, useobject=False)
471 _register_attribute(Foo, "b", uselist=False, useobject=False)
472
473 f = Foo()
474 attributes.instance_state(f)._expire(
475 attributes.instance_dict(f), set()
476 )
477 eq_(f.a, "this is a")
478 eq_(f.b, 12)
479 f.a = "this is some new a"
480 attributes.instance_state(f)._expire(
481 attributes.instance_dict(f), set()
482 )
483 eq_(f.a, "this is a")
484 eq_(f.b, 12)
485 attributes.instance_state(f)._expire(
486 attributes.instance_dict(f), set()
487 )
488 f.a = "this is another new a"
489 eq_(f.a, "this is another new a")
490 eq_(f.b, 12)
491 attributes.instance_state(f)._expire(
492 attributes.instance_dict(f), set()
493 )
494 eq_(f.a, "this is a")
495 eq_(f.b, 12)
496 del f.a
497 eq_(f.a, None)
498 eq_(f.b, 12)
499 attributes.instance_state(f)._commit_all(
500 attributes.instance_dict(f), set()
501 )
502 eq_(f.a, None)
503 eq_(f.b, 12)
504
505 def test_deferred_pickleable(self):
506 data = {"a": "this is a", "b": 12}

Callers

nothing calls this directly

Calls 6

eq_Function · 0.90
manager_of_classMethod · 0.80
_expireMethod · 0.80
_commit_allMethod · 0.80
_register_attributeFunction · 0.70
FooClass · 0.70

Tested by

no test coverage detected