WSGI application that invokes dirty apps and returns worker info. Routes: - GET /lightweight/ping - Call LightweightApp.ping() - GET /heavy/predict/<data> - Call HeavyApp.predict(data) - GET /config_limited/info - Call ConfigLimitedApp.get_info() - GET /status - Get overall
(environ, start_response)
| 19 | |
| 20 | |
| 21 | def application(environ, start_response): |
| 22 | """ |
| 23 | WSGI application that invokes dirty apps and returns worker info. |
| 24 | |
| 25 | Routes: |
| 26 | - GET /lightweight/ping - Call LightweightApp.ping() |
| 27 | - GET /heavy/predict/<data> - Call HeavyApp.predict(data) |
| 28 | - GET /config_limited/info - Call ConfigLimitedApp.get_info() |
| 29 | - GET /status - Get overall status |
| 30 | """ |
| 31 | path = environ.get('PATH_INFO', '/') |
| 32 | method = environ.get('REQUEST_METHOD', 'GET') |
| 33 | |
| 34 | if method != 'GET': |
| 35 | start_response('405 Method Not Allowed', [('Content-Type', 'text/plain')]) |
| 36 | return [b'Method not allowed'] |
| 37 | |
| 38 | # Import dirty client here to avoid import at module load |
| 39 | from gunicorn.dirty import get_dirty_client |
| 40 | |
| 41 | try: |
| 42 | client = get_dirty_client() |
| 43 | |
| 44 | if path == '/status': |
| 45 | start_response('200 OK', [('Content-Type', 'application/json')]) |
| 46 | return [json.dumps({"status": "ok"}).encode()] |
| 47 | |
| 48 | elif path == '/lightweight/ping': |
| 49 | result = client.execute("app:LightweightApp", "ping") |
| 50 | start_response('200 OK', [('Content-Type', 'application/json')]) |
| 51 | return [json.dumps(result).encode()] |
| 52 | |
| 53 | elif path.startswith('/heavy/predict/'): |
| 54 | data = path.split('/')[-1] |
| 55 | result = client.execute("app:HeavyApp", "predict", data) |
| 56 | start_response('200 OK', [('Content-Type', 'application/json')]) |
| 57 | return [json.dumps(result).encode()] |
| 58 | |
| 59 | elif path == '/heavy/get_worker_id': |
| 60 | result = client.execute("app:HeavyApp", "get_worker_id") |
| 61 | start_response('200 OK', [('Content-Type', 'application/json')]) |
| 62 | return [json.dumps({"worker_id": result}).encode()] |
| 63 | |
| 64 | elif path == '/config_limited/info': |
| 65 | result = client.execute("app:ConfigLimitedApp", "get_info") |
| 66 | start_response('200 OK', [('Content-Type', 'application/json')]) |
| 67 | return [json.dumps(result).encode()] |
| 68 | |
| 69 | elif path == '/config_limited/get_worker_id': |
| 70 | result = client.execute("app:ConfigLimitedApp", "get_worker_id") |
| 71 | start_response('200 OK', [('Content-Type', 'application/json')]) |
| 72 | return [json.dumps({"worker_id": result}).encode()] |
| 73 | |
| 74 | elif path == '/lightweight/get_worker_id': |
| 75 | result = client.execute("app:LightweightApp", "get_worker_id") |
| 76 | start_response('200 OK', [('Content-Type', 'application/json')]) |
| 77 | return [json.dumps({"worker_id": result}).encode()] |
| 78 |
nothing calls this directly
no test coverage detected