| 82 | |
| 83 | |
| 84 | class EventDispatcher(EventDispatcherInterface): |
| 85 | # TODO: Make dispatcher to accept external mappings. |
| 86 | def __init__( |
| 87 | self, |
| 88 | event_listeners: Optional[ |
| 89 | Dict[Type[object], List[EventListenerInterface]] |
| 90 | ] = None, |
| 91 | ): |
| 92 | """ |
| 93 | Dispatcher that dispatches events to listeners associated with given event. |
| 94 | """ |
| 95 | self._event_listeners_mapping: Dict[ |
| 96 | Type[object], List[EventListenerInterface] |
| 97 | ] = { |
| 98 | AfterConnectionReleasedEvent: [ |
| 99 | ReAuthConnectionListener(), |
| 100 | ], |
| 101 | AfterPooledConnectionsInstantiationEvent: [ |
| 102 | RegisterReAuthForPooledConnections(), |
| 103 | ], |
| 104 | AfterSingleConnectionInstantiationEvent: [ |
| 105 | RegisterReAuthForSingleConnection() |
| 106 | ], |
| 107 | AfterPubSubConnectionInstantiationEvent: [RegisterReAuthForPubSub()], |
| 108 | AfterAsyncClusterInstantiationEvent: [RegisterReAuthForAsyncClusterNodes()], |
| 109 | AsyncAfterConnectionReleasedEvent: [ |
| 110 | AsyncReAuthConnectionListener(), |
| 111 | ], |
| 112 | } |
| 113 | |
| 114 | # Reentrant so a finalizer/listener that runs on the same thread |
| 115 | # while the lock is held (e.g. a weakref.finalize callback fired |
| 116 | # from cyclic GC during an allocation inside register_listeners / |
| 117 | # unregister_listeners) can re-enter without deadlocking. |
| 118 | self._lock = threading.RLock() |
| 119 | self._async_lock = None |
| 120 | |
| 121 | if event_listeners: |
| 122 | self.register_listeners(event_listeners) |
| 123 | |
| 124 | def dispatch(self, event: object): |
| 125 | # Snapshot listeners under the lock, then release it before invoking |
| 126 | # them. Holding the lock across listener execution would turn any |
| 127 | # listener that calls register_listeners / unregister_listeners / |
| 128 | # dispatch back into the dispatcher into a deadlock. |
| 129 | with self._lock: |
| 130 | listeners = list(self._event_listeners_mapping.get(type(event), [])) |
| 131 | for listener in listeners: |
| 132 | listener.listen(event) |
| 133 | |
| 134 | async def dispatch_async(self, event: object): |
| 135 | if self._async_lock is None: |
| 136 | self._async_lock = asyncio.Lock() |
| 137 | |
| 138 | # Snapshot listeners under the lock, then release it before awaiting |
| 139 | # them. See the note in dispatch(); the same rationale applies here |
| 140 | # for dispatch_async re-entry from within a listener. |
| 141 | async with self._async_lock: |
no outgoing calls