MCPcopy
hub / github.com/benoitc/gunicorn / send_command

Method send_command

gunicorn/ctl/client.py:74–104  ·  view source on GitHub ↗

Send command and wait for response. Args: command: Command string (e.g., "show workers") args: Optional additional arguments Returns: Response data dictionary Raises: ControlClientError: If communication fails

(self, command: str, args: list = None)

Source from the content-addressed store, hash-verified

72 self._sock = None
73
74 def send_command(self, command: str, args: list = None) -> dict:
75 """
76 Send command and wait for response.
77
78 Args:
79 command: Command string (e.g., "show workers")
80 args: Optional additional arguments
81
82 Returns:
83 Response data dictionary
84
85 Raises:
86 ControlClientError: If communication fails
87 """
88 if not self._sock:
89 self.connect()
90
91 self._request_id += 1
92 request = make_request(self._request_id, command, args)
93
94 try:
95 ControlProtocol.write_message(self._sock, request)
96 response = ControlProtocol.read_message(self._sock)
97 except Exception as e:
98 self.close()
99 raise ControlClientError(f"Communication error: {e}")
100
101 if response.get("status") == "error":
102 raise ControlClientError(response.get("error", "Unknown error"))
103
104 return response.get("data", {})
105
106 def __enter__(self):
107 self.connect()

Callers 11

run_interactiveFunction · 0.95
run_commandFunction · 0.80
test_show_workersMethod · 0.80
test_show_statsMethod · 0.80
test_help_commandMethod · 0.80
test_worker_addMethod · 0.80
test_invalid_commandMethod · 0.80

Calls 7

connectMethod · 0.95
closeMethod · 0.95
make_requestFunction · 0.90
ControlClientErrorClass · 0.85
write_messageMethod · 0.45
read_messageMethod · 0.45
getMethod · 0.45

Tested by 9

test_show_workersMethod · 0.64
test_show_statsMethod · 0.64
test_help_commandMethod · 0.64
test_worker_addMethod · 0.64
test_invalid_commandMethod · 0.64