MCPcopy
hub / github.com/psycopg/psycopg / AttemptWithBackoff

Class AttemptWithBackoff

psycopg_pool/psycopg_pool/base.py:200–229  ·  view source on GitHub ↗

Keep the state of a repeated operation attempt with exponential backoff.

Source from the content-addressed store, hash-verified

198
199
200class AttemptWithBackoff:
201 """
202 Keep the state of a repeated operation attempt with exponential backoff.
203 """
204
205 INITIAL_DELAY = 1.0
206 DELAY_JITTER = 0.1
207 DELAY_BACKOFF = 2.0
208
209 def __init__(self, *, timeout: float):
210 self.timeout = timeout
211 self.delay = 0.0
212 self.give_up_at = 0.0
213
214 def update_delay(self, now: float) -> None:
215 """Calculate how long to wait for a new connection attempt"""
216 if self.delay == 0.0:
217 self.give_up_at = now + self.timeout
218 self.delay = BasePool._jitter(
219 self.INITIAL_DELAY, -self.DELAY_JITTER, self.DELAY_JITTER
220 )
221 else:
222 self.delay *= self.DELAY_BACKOFF
223
224 if self.delay + now > self.give_up_at:
225 self.delay = max(0.0, self.give_up_at - now)
226
227 def time_to_give_up(self, now: float) -> bool:
228 """Return True if we are tired of trying this attempt. Meh."""
229 return self.give_up_at > 0.0 and now >= self.give_up_at

Callers 4

_add_connectionMethod · 0.85
_add_connectionMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected