MCPcopy
hub / github.com/socketio/socket.io / setupPrimaryWithRedis

Function setupPrimaryWithRedis

packages/socket.io-cluster-engine/lib/redis.ts:31–122  ·  view source on GitHub ↗
(
  pubClient: any,
  subClient: any,
  opts?: PrimaryWithRedisOptions,
)

Source from the content-addressed store, hash-verified

29}
30
31export function setupPrimaryWithRedis(
32 pubClient: any,
33 subClient: any,
34 opts?: PrimaryWithRedisOptions,
35) {
36 const primaryId = randomUUID();
37 const prefix = opts?.channelPrefix || "engine.io";
38 const channels = [channelName(prefix), channelName(prefix, primaryId)];
39
40 debug("subscribing to redis channels: %s", channels);
41 SUBSCRIBE(subClient, channels, (buffer: Buffer) => {
42 let message: Message & { _source?: string; _primaryId?: string };
43 try {
44 message = decode(buffer) as Message;
45 } catch (e) {
46 debug("ignore malformed buffer");
47 return;
48 }
49
50 if (message._source !== MESSAGE_SOURCE) {
51 debug("ignore message from unknown source");
52 return;
53 }
54
55 if (message._primaryId === primaryId) {
56 debug("ignore message from self");
57 return;
58 }
59
60 debug("received message: %j", message);
61
62 // @ts-expect-error recipientId is not defined for all messages
63 const recipientId = (message as Message).recipientId;
64 if (recipientId) {
65 for (const worker of Object.values(cluster.workers)) {
66 if (worker[kNodeId] === recipientId) {
67 debug("forward message to worker %d", worker.id);
68 worker.send(message, null, ignoreError);
69 return;
70 }
71 }
72 }
73
74 debug("forward message to all workers");
75 for (const worker of Object.values(cluster.workers)) {
76 worker.send(message, null, ignoreError);
77 }
78 });
79
80 cluster.on(
81 "message",
82 (
83 sourceWorker,
84 message: Message & { _source?: string; _primaryId?: string },
85 ) => {
86 if (message._source !== MESSAGE_SOURCE) {
87 debug("ignore message from unknown source");
88 return;

Callers

nothing calls this directly

Calls 8

channelNameFunction · 0.85
debugFunction · 0.85
SUBSCRIBEFunction · 0.85
decodeFunction · 0.50
encodeFunction · 0.50
sendMethod · 0.45
onMethod · 0.45
publishMethod · 0.45

Tested by

no test coverage detected