clusterWriteLoop implements write loop for multiplexing (proxy) session at a node which hosts master topic. The session is a multiplexing session, i.e. it handles requests for multiple sessions at origin.
(forTopic string)
| 1197 | // clusterWriteLoop implements write loop for multiplexing (proxy) session at a node which hosts master topic. |
| 1198 | // The session is a multiplexing session, i.e. it handles requests for multiple sessions at origin. |
| 1199 | func (sess *Session) clusterWriteLoop(forTopic string) { |
| 1200 | terminate := true |
| 1201 | defer func() { |
| 1202 | if terminate { |
| 1203 | sess.closeRPC() |
| 1204 | globals.sessionStore.Delete(sess) |
| 1205 | sess.inflightReqs = nil |
| 1206 | sess.unsubAll() |
| 1207 | } |
| 1208 | }() |
| 1209 | |
| 1210 | for { |
| 1211 | select { |
| 1212 | case msg, ok := <-sess.send: |
| 1213 | if !ok || sess.clnode.endpoint == nil { |
| 1214 | // channel closed |
| 1215 | return |
| 1216 | } |
| 1217 | srvMsg := msg.(*ServerComMessage) |
| 1218 | response := &ClusterResp{SrvMsg: srvMsg} |
| 1219 | if srvMsg.sess == nil { |
| 1220 | response.OrigSid = "*" |
| 1221 | } else { |
| 1222 | response.OrigReqType = srvMsg.sess.proxyReq |
| 1223 | response.OrigSid = srvMsg.sess.sid |
| 1224 | srvMsg.AsUser = srvMsg.sess.uid.UserId() |
| 1225 | |
| 1226 | switch srvMsg.sess.proxyReq { |
| 1227 | case ProxyReqJoin, ProxyReqLeave, ProxyReqMeta, ProxyReqBgSession, ProxyReqMeUserAgent, ProxyReqCall: |
| 1228 | // Do nothing |
| 1229 | case ProxyReqBroadcast, ProxyReqNone: |
| 1230 | if srvMsg.Data != nil || srvMsg.Pres != nil || srvMsg.Info != nil { |
| 1231 | response.OrigSid = "*" |
| 1232 | } else if srvMsg.Ctrl == nil { |
| 1233 | logs.Warn.Println("cluster: request type not set in clusterWriteLoop", sess.sid, |
| 1234 | srvMsg.describe(), "src_sid:", srvMsg.sess.sid) |
| 1235 | } |
| 1236 | default: |
| 1237 | logs.Err.Panicln("cluster: unknown request type in clusterWriteLoop", srvMsg.sess.proxyReq) |
| 1238 | } |
| 1239 | } |
| 1240 | |
| 1241 | srvMsg.RcptTo = forTopic |
| 1242 | response.RcptTo = forTopic |
| 1243 | |
| 1244 | if err := sess.clnode.masterToProxyAsync(response); err != nil { |
| 1245 | logs.Warn.Printf("cluster: response to proxy failed \"%s\": %s", sess.sid, err.Error()) |
| 1246 | return |
| 1247 | } |
| 1248 | case msg := <-sess.stop: |
| 1249 | if msg == nil { |
| 1250 | // Terminating multiplexing session. |
| 1251 | return |
| 1252 | } |
| 1253 | // There are two cases of msg != nil: |
| 1254 | // * user is being deleted |
| 1255 | // * node shutdown |
| 1256 | // In both cases the msg does not need to be forwarded to the proxy. |
no test coverage detected