| 169 | |
| 170 | |
| 171 | class MultiFernet: |
| 172 | def __init__(self, fernets: Iterable[Fernet]): |
| 173 | fernets = list(fernets) |
| 174 | if not fernets: |
| 175 | raise ValueError( |
| 176 | "MultiFernet requires at least one Fernet instance" |
| 177 | ) |
| 178 | self._fernets = fernets |
| 179 | |
| 180 | def encrypt(self, msg: bytes) -> bytes: |
| 181 | return self.encrypt_at_time(msg, int(time.time())) |
| 182 | |
| 183 | def encrypt_at_time(self, msg: bytes, current_time: int) -> bytes: |
| 184 | return self._fernets[0].encrypt_at_time(msg, current_time) |
| 185 | |
| 186 | def rotate(self, msg: bytes | str) -> bytes: |
| 187 | timestamp, data = Fernet._get_unverified_token_data(msg) |
| 188 | for f in self._fernets: |
| 189 | try: |
| 190 | p = f._decrypt_data(data, timestamp, None) |
| 191 | break |
| 192 | except InvalidToken: |
| 193 | pass |
| 194 | else: |
| 195 | raise InvalidToken |
| 196 | |
| 197 | iv = os.urandom(16) |
| 198 | return self._fernets[0]._encrypt_from_parts(p, timestamp, iv) |
| 199 | |
| 200 | def decrypt(self, msg: bytes | str, ttl: int | None = None) -> bytes: |
| 201 | for f in self._fernets: |
| 202 | try: |
| 203 | return f.decrypt(msg, ttl) |
| 204 | except InvalidToken: |
| 205 | pass |
| 206 | raise InvalidToken |
| 207 | |
| 208 | def decrypt_at_time( |
| 209 | self, msg: bytes | str, ttl: int, current_time: int |
| 210 | ) -> bytes: |
| 211 | for f in self._fernets: |
| 212 | try: |
| 213 | return f.decrypt_at_time(msg, ttl, current_time) |
| 214 | except InvalidToken: |
| 215 | pass |
| 216 | raise InvalidToken |
| 217 | |
| 218 | def extract_timestamp(self, msg: bytes | str) -> int: |
| 219 | for f in self._fernets: |
| 220 | try: |
| 221 | return f.extract_timestamp(msg) |
| 222 | except InvalidToken: |
| 223 | pass |
| 224 | raise InvalidToken |
no outgoing calls