MCPcopy
hub / github.com/benoitc/gunicorn / app

Function app

benchmarks/dirty_bench_wsgi.py:29–152  ·  view source on GitHub ↗

WSGI application that calls dirty pool tasks. Query parameters: action: Task action to call (default: sleep_task) duration: Duration in ms for sleep/cpu tasks (default: 10) sleep: Sleep duration for mixed_task (default: 50) cpu: CPU duration for mixed_task (

(environ, start_response)

Source from the content-addressed store, hash-verified

27
28
29def app(environ, start_response):
30 """
31 WSGI application that calls dirty pool tasks.
32
33 Query parameters:
34 action: Task action to call (default: sleep_task)
35 duration: Duration in ms for sleep/cpu tasks (default: 10)
36 sleep: Sleep duration for mixed_task (default: 50)
37 cpu: CPU duration for mixed_task (default: 50)
38 size: Payload size in bytes for payload_task (default: 100)
39 intensity: CPU intensity for cpu/mixed tasks (default: 1.0)
40 app: Dirty app path (default: benchmarks.dirty_bench_app:BenchmarkApp)
41
42 Endpoints:
43 / - Default sleep_task
44 /sleep - sleep_task with ?duration=N
45 /cpu - cpu_task with ?duration=N&intensity=N
46 /mixed - mixed_task with ?sleep=N&cpu=N
47 /payload - payload_task with ?size=N
48 /echo - echo_task (POST body echoed)
49 /stats - Get accumulated stats
50 /health - Health check
51 """
52 path = environ.get('PATH_INFO', '/')
53 method = environ.get('REQUEST_METHOD', 'GET')
54 query = parse_qs(environ.get('QUERY_STRING', ''))
55
56 # Helper to get query params with defaults
57 def get_param(name, default, type_fn=int):
58 values = query.get(name, [])
59 if values:
60 try:
61 return type_fn(values[0])
62 except (ValueError, TypeError):
63 return default
64 return default
65
66 # Get app path from query or use default
67 app_path = query.get('app', [BENCHMARK_APP])[0]
68
69 try:
70 client = get_dirty_client()
71
72 # Route based on path
73 if path in ('/', '/sleep'):
74 duration = get_param('duration', 10)
75 result = client.execute(app_path, "sleep_task", duration)
76
77 elif path == '/cpu':
78 duration = get_param('duration', 100)
79 intensity = get_param('intensity', 1.0, float)
80 result = client.execute(app_path, "cpu_task", duration, intensity)
81
82 elif path == '/mixed':
83 sleep_ms = get_param('sleep', 50)
84 cpu_ms = get_param('cpu', 50)
85 intensity = get_param('intensity', 1.0, float)
86 result = client.execute(app_path, "mixed_task", sleep_ms, cpu_ms,

Callers

nothing calls this directly

Calls 7

get_dirty_clientFunction · 0.90
get_paramFunction · 0.85
decodeMethod · 0.80
encodeMethod · 0.80
getMethod · 0.45
executeMethod · 0.45
readMethod · 0.45

Tested by

no test coverage detected