| 390 | } |
| 391 | |
| 392 | void onMessage(Object message) { |
| 393 | // Any received frame counts as keepalive — many exchanges (lbank, |
| 394 | // hashkey, krakenfutures, …) use application-level ping/pong rather |
| 395 | // than WS protocol PING/PONG, so onPong() never fires. Without this, |
| 396 | // pingLoop kills the connection after `keepAlive * maxPingPongMisses` |
| 397 | // ms even though the link is healthy and delivering data. |
| 398 | this.lastPong = System.currentTimeMillis(); |
| 399 | if (this.handleMessageCallback != null) { |
| 400 | // Offload to a per-client single-thread executor: keeps Netty's event loop |
| 401 | // unblocked AND preserves frame ordering per connection. Cross-frame races |
| 402 | // on shared exchange state (orderbook cache, balance sub-maps, etc.) are |
| 403 | // eliminated for same-client traffic. |
| 404 | messageExecutor.execute(() -> { |
| 405 | try { |
| 406 | if (this.verbose) { |
| 407 | System.out.println(getFormattedDate() + "OnMessage:" + message); |
| 408 | } |
| 409 | this.handleMessageCallback.accept(this, message); |
| 410 | } catch (Exception e) { |
| 411 | if (this.verbose) { |
| 412 | System.err.println("handleMessage error: " + e.getMessage()); |
| 413 | } |
| 414 | this.reject(e); |
| 415 | } |
| 416 | }); |
| 417 | } |
| 418 | } |
| 419 | |
| 420 | public void onPong() { |
| 421 | this.lastPong = System.currentTimeMillis(); |