Receive single JSON data frame from a connection. Raise OSError if the data received is not valid JSON or if it is not a dict.
(connection: IPCBase)
| 17 | |
| 18 | |
| 19 | def receive(connection: IPCBase) -> Any: |
| 20 | """Receive single JSON data frame from a connection. |
| 21 | |
| 22 | Raise OSError if the data received is not valid JSON or if it is |
| 23 | not a dict. |
| 24 | """ |
| 25 | bdata = connection.read() |
| 26 | if not bdata: |
| 27 | raise OSError("No data received") |
| 28 | try: |
| 29 | data = json.loads(bdata) |
| 30 | except Exception as e: |
| 31 | raise OSError("Data received is not valid JSON") from e |
| 32 | if not isinstance(data, dict): |
| 33 | raise OSError(f"Data received is not a dict ({type(data)})") |
| 34 | return data |
| 35 | |
| 36 | |
| 37 | def send(connection: IPCBase, data: Any) -> None: |
no test coverage detected
searching dependent graphs…