Chan returns a `chan` that you can read incoming messages from. The returned `chan` will be closed when the WebSocket connection is closed. If there is an error reading from the WebSocket or decoding a value the WebSocket will be closed. Safety: Chan must only be called once. Successive calls will
()
| 25 | // |
| 26 | // Safety: Chan must only be called once. Successive calls will panic. |
| 27 | func (d *Decoder[T]) Chan() <-chan T { |
| 28 | if !d.chanCalled.CompareAndSwap(false, true) { |
| 29 | panic("chan called more than once") |
| 30 | } |
| 31 | values := make(chan T, 1) |
| 32 | go func() { |
| 33 | defer close(values) |
| 34 | defer d.conn.Close(websocket.StatusGoingAway, "") |
| 35 | for { |
| 36 | // we don't use d.ctx here because it only gets canceled after closing the connection |
| 37 | // and a "connection closed" type error is more clear than context canceled. |
| 38 | typ, b, err := d.conn.Read(context.Background()) |
| 39 | if err != nil { |
| 40 | // might be benign like EOF, so just log at debug |
| 41 | d.logger.Debug(d.ctx, "error reading from websocket", slog.Error(err)) |
| 42 | return |
| 43 | } |
| 44 | if typ != d.typ { |
| 45 | d.logger.Error(d.ctx, "websocket type mismatch while decoding") |
| 46 | return |
| 47 | } |
| 48 | var value T |
| 49 | err = json.Unmarshal(b, &value) |
| 50 | if err != nil { |
| 51 | d.logger.Error(d.ctx, "error unmarshalling", slog.Error(err)) |
| 52 | return |
| 53 | } |
| 54 | select { |
| 55 | case values <- value: |
| 56 | // OK |
| 57 | case <-d.ctx.Done(): |
| 58 | return |
| 59 | } |
| 60 | } |
| 61 | }() |
| 62 | return values |
| 63 | } |
| 64 | |
| 65 | // nolint: revive // complains that Encoder has the same function name |
| 66 | func (d *Decoder[T]) Close() error { |