MCPcopy
hub / github.com/redis/redis-py / EventDispatcher

Class EventDispatcher

redis/event.py:84–183  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

82
83
84class EventDispatcher(EventDispatcherInterface):
85 # TODO: Make dispatcher to accept external mappings.
86 def __init__(
87 self,
88 event_listeners: Optional[
89 Dict[Type[object], List[EventListenerInterface]]
90 ] = None,
91 ):
92 """
93 Dispatcher that dispatches events to listeners associated with given event.
94 """
95 self._event_listeners_mapping: Dict[
96 Type[object], List[EventListenerInterface]
97 ] = {
98 AfterConnectionReleasedEvent: [
99 ReAuthConnectionListener(),
100 ],
101 AfterPooledConnectionsInstantiationEvent: [
102 RegisterReAuthForPooledConnections(),
103 ],
104 AfterSingleConnectionInstantiationEvent: [
105 RegisterReAuthForSingleConnection()
106 ],
107 AfterPubSubConnectionInstantiationEvent: [RegisterReAuthForPubSub()],
108 AfterAsyncClusterInstantiationEvent: [RegisterReAuthForAsyncClusterNodes()],
109 AsyncAfterConnectionReleasedEvent: [
110 AsyncReAuthConnectionListener(),
111 ],
112 }
113
114 # Reentrant so a finalizer/listener that runs on the same thread
115 # while the lock is held (e.g. a weakref.finalize callback fired
116 # from cyclic GC during an allocation inside register_listeners /
117 # unregister_listeners) can re-enter without deadlocking.
118 self._lock = threading.RLock()
119 self._async_lock = None
120
121 if event_listeners:
122 self.register_listeners(event_listeners)
123
124 def dispatch(self, event: object):
125 # Snapshot listeners under the lock, then release it before invoking
126 # them. Holding the lock across listener execution would turn any
127 # listener that calls register_listeners / unregister_listeners /
128 # dispatch back into the dispatcher into a deadlock.
129 with self._lock:
130 listeners = list(self._event_listeners_mapping.get(type(event), []))
131 for listener in listeners:
132 listener.listen(event)
133
134 async def dispatch_async(self, event: object):
135 if self._async_lock is None:
136 self._async_lock = asyncio.Lock()
137
138 # Snapshot listeners under the lock, then release it before awaiting
139 # them. See the note in dispatch(); the same rationale applies here
140 # for dispatch_async re-entry from within a listener.
141 async with self._async_lock:

Calls

no outgoing calls