MCPcopy
hub / github.com/encode/uvicorn / send_keepalive_ping

Method send_keepalive_ping

uvicorn/protocols/websockets/wsproto_impl.py:287–298  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

285 self.ping_timer = self.loop.call_later(delay, self.send_keepalive_ping)
286
287 def send_keepalive_ping(self) -> None:
288 self.ping_timer = None
289 if self.close_sent or self.transport.is_closing(): # pragma: no cover
290 return
291 # Random 4-byte payload identifies this ping; `handle_pong` uses it to ignore stale or unsolicited pongs.
292 self.pending_ping_payload = struct.pack("!I", random.getrandbits(32))
293 self.ping_sent_at = self.loop.time()
294 self.transport.write(self.conn.send(wsproto.events.Ping(payload=self.pending_ping_payload)))
295 if self.ping_timeout is not None:
296 self.pong_timer = self.loop.call_later(self.ping_timeout, self.keepalive_timeout)
297 else: # pragma: no cover
298 self.schedule_ping()
299
300 def keepalive_timeout(self) -> None:
301 self.pong_timer = None

Callers

nothing calls this directly

Calls 5

schedule_pingMethod · 0.95
is_closingMethod · 0.45
writeMethod · 0.45
sendMethod · 0.45
call_laterMethod · 0.45

Tested by

no test coverage detected