Metadata returns [MsgMetadata] for a JetStream message.
()
| 306 | |
| 307 | // Metadata returns [MsgMetadata] for a JetStream message. |
| 308 | func (m *jetStreamMsg) Metadata() (*MsgMetadata, error) { |
| 309 | if err := m.checkReply(); err != nil { |
| 310 | return nil, err |
| 311 | } |
| 312 | |
| 313 | tokens, err := parser.GetMetadataFields(m.msg.Reply) |
| 314 | if err != nil { |
| 315 | return nil, fmt.Errorf("%w: %s", ErrNotJSMessage, err) |
| 316 | } |
| 317 | |
| 318 | meta := &MsgMetadata{ |
| 319 | Domain: tokens[parser.AckDomainTokenPos], |
| 320 | NumDelivered: parser.ParseNum(tokens[parser.AckNumDeliveredTokenPos]), |
| 321 | NumPending: parser.ParseNum(tokens[parser.AckNumPendingTokenPos]), |
| 322 | Timestamp: time.Unix(0, int64(parser.ParseNum(tokens[parser.AckTimestampSeqTokenPos]))), |
| 323 | Stream: tokens[parser.AckStreamTokenPos], |
| 324 | Consumer: tokens[parser.AckConsumerTokenPos], |
| 325 | } |
| 326 | meta.Sequence.Stream = parser.ParseNum(tokens[parser.AckStreamSeqTokenPos]) |
| 327 | meta.Sequence.Consumer = parser.ParseNum(tokens[parser.AckConsumerSeqTokenPos]) |
| 328 | return meta, nil |
| 329 | } |
| 330 | |
| 331 | // Data returns the message body. |
| 332 | func (m *jetStreamMsg) Data() []byte { |