MCPcopy
hub / github.com/coder/websocket / subscribe

Method subscribe

internal/examples/chat/chat.go:114–158  ·  view source on GitHub ↗

subscribe subscribes the given WebSocket to all broadcast messages. It creates a subscriber with a buffered msgs chan to give some room to slower connections and then registers the subscriber. It then listens for all messages and writes them to the WebSocket. If the context is cancelled or an error

(w http.ResponseWriter, r *http.Request)

Source from the content-addressed store, hash-verified

112// It uses CloseRead to keep reading from the connection to process control
113// messages and cancel the context if the connection drops.
114func (cs *chatServer) subscribe(w http.ResponseWriter, r *http.Request) error {
115 var mu sync.Mutex
116 var c *websocket.Conn
117 var closed bool
118 s := &subscriber{
119 msgs: make(chan []byte, cs.subscriberMessageBuffer),
120 closeSlow: func() {
121 mu.Lock()
122 defer mu.Unlock()
123 closed = true
124 if c != nil {
125 c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
126 }
127 },
128 }
129 cs.addSubscriber(s)
130 defer cs.deleteSubscriber(s)
131
132 c2, err := websocket.Accept(w, r, nil)
133 if err != nil {
134 return err
135 }
136 mu.Lock()
137 if closed {
138 mu.Unlock()
139 return net.ErrClosed
140 }
141 c = c2
142 mu.Unlock()
143 defer c.CloseNow()
144
145 ctx := c.CloseRead(context.Background())
146
147 for {
148 select {
149 case msg := <-s.msgs:
150 err := writeTimeout(ctx, time.Second*5, c, msg)
151 if err != nil {
152 return err
153 }
154 case <-ctx.Done():
155 return ctx.Err()
156 }
157 }
158}
159
160// publish publishes the msg to all subscribers.
161// It never blocks and so messages to slow subscribers

Callers 1

subscribeHandlerMethod · 0.95

Calls 8

CloseMethod · 0.95
addSubscriberMethod · 0.95
deleteSubscriberMethod · 0.95
CloseNowMethod · 0.95
CloseReadMethod · 0.95
AcceptFunction · 0.92
writeTimeoutFunction · 0.85
LockMethod · 0.45

Tested by

no test coverage detected