Check if the message is a shutdown request.
(buf: ReadBuffer, expected_tag: Tag)
| 138 | |
| 139 | |
| 140 | def should_shutdown(buf: ReadBuffer, expected_tag: Tag) -> bool: |
| 141 | """Check if the message is a shutdown request.""" |
| 142 | tag = read_tag(buf) |
| 143 | if tag == SCC_REQUEST_MESSAGE: |
| 144 | assert not read_int_list(buf) |
| 145 | return True |
| 146 | assert tag == expected_tag, f"Unexpected tag: {tag}" |
| 147 | return False |
| 148 | |
| 149 | |
| 150 | def serve(server: IPCServer, ctx: ServerContext) -> None: |
no test coverage detected
searching dependent graphs…