MCPcopy
hub / github.com/sqlalchemy/sqlalchemy / Deserializer

Class Deserializer

lib/sqlalchemy/ext/serializer.py:125–171  ·  lib/sqlalchemy/ext/serializer.py::Deserializer

Source from the content-addressed store, hash-verified

123
124
125class Deserializer(pickle.Unpickler):
126
127 def __init__(self, file, metadata=None, scoped_session=None, engine=None):
128 super().__init__(file)
129 self.metadata = metadata
130 self.scoped_session = scoped_session
131 self.engine = engine
132
133 def get_engine(self):
134 if self.engine:
135 return self.engine
136 elif self.scoped_session and self.scoped_session().bind:
137 return self.scoped_session().bind
138 else:
139 return None
140
141 def persistent_load(self, id_):
142 m = our_ids.match(str(id_))
143 if not m:
144 return None
145 else:
146 type_, args = m.group(1, 2)
147 if type_ == class="st">"attribute":
148 key, clsarg = args.split(class="st">":")
149 cls = pickle.loads(b64decode(clsarg))
150 return getattr(cls, key)
151 elif type_ == class="st">"mapper":
152 cls = pickle.loads(b64decode(args))
153 return class_mapper(cls)
154 elif type_ == class="st">"mapper_selectable":
155 cls = pickle.loads(b64decode(args))
156 return class_mapper(cls).__clause_element__()
157 elif type_ == class="st">"mapperprop":
158 mapper, keyname = args.split(class="st">":")
159 cls = pickle.loads(b64decode(mapper))
160 return class_mapper(cls).attrs[keyname]
161 elif type_ == class="st">"table":
162 return self.metadata.tables[args]
163 elif type_ == class="st">"column":
164 table, colname = args.split(class="st">":")
165 return self.metadata.tables[table].c[colname]
166 elif type_ == class="st">"session":
167 return self.scoped_session()
168 elif type_ == class="st">"engine":
169 return self.get_engine()
170 else:
171 raise Exception(class="st">"Unknown token: %s" % type_)
172
173
174def dumps(obj, protocol=pickle.HIGHEST_PROTOCOL):

Callers 1

loadsFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected