Code
Hub
Workspaces
Connect
Indexed graphs
Engine
MCP
copy
hub
/
github.com/celery/celery
/ get_active_redis_channels
Function
get_active_redis_channels
t/integration/conftest.py:47–48 ·
view source on GitHub ↗
()
Source
from the content-addressed store, hash-verified
45
46
47
def
get_active_redis_channels():
48
return
get_redis_connection().execute_command(
'PUBSUB CHANNELS'
)
49
50
51
def
check_for_logs(caplog, message: str, max_wait: float = 1.0, interval: float = 0.1) -> bool:
Callers
4
test_redis_subscribed_channels_leak
Method · 0.85
test_ignoring_result_no_subscriptions
Method · 0.85
test_asyncresult_forget_cancels_subscription
Method · 0.85
test_asyncresult_get_cancels_subscription
Method · 0.85
Calls
1
get_redis_connection
Function · 0.90
Tested by
no test coverage detected