MCPcopy
hub / github.com/nats-io/nats.go / convertDirectGetMsgResponseToMsg

Function convertDirectGetMsgResponseToMsg

jsm.go:1331–1395  ·  view source on GitHub ↗
(name string, r *Msg)

Source from the content-addressed store, hash-verified

1329}
1330
1331func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error) {
1332 // Check for 404/408. We would get a no-payload message and a "Status" header
1333 if len(r.Data) == 0 {
1334 val := r.Header.Get(statusHdr)
1335 if val != _EMPTY_ {
1336 switch val {
1337 case noMessagesSts:
1338 return nil, ErrMsgNotFound
1339 default:
1340 desc := r.Header.Get(descrHdr)
1341 if desc == _EMPTY_ {
1342 desc = "unable to get message"
1343 }
1344 return nil, fmt.Errorf("nats: %s", desc)
1345 }
1346 }
1347 }
1348 // Check for headers that give us the required information to
1349 // reconstruct the message.
1350 if len(r.Header) == 0 {
1351 return nil, errors.New("nats: response should have headers")
1352 }
1353 stream := r.Header.Get(JSStream)
1354 if stream == _EMPTY_ {
1355 return nil, errors.New("nats: missing stream header")
1356 }
1357
1358 // Mirrors can now answer direct gets, so removing check for name equality.
1359 // TODO(dlc) - We could have server also have a header with origin and check that?
1360
1361 seqStr := r.Header.Get(JSSequence)
1362 if seqStr == _EMPTY_ {
1363 return nil, errors.New("nats: missing sequence header")
1364 }
1365 seq, err := strconv.ParseUint(seqStr, 10, 64)
1366 if err != nil {
1367 return nil, fmt.Errorf("nats: invalid sequence header '%s': %v", seqStr, err)
1368 }
1369 timeStr := r.Header.Get(JSTimeStamp)
1370 if timeStr == _EMPTY_ {
1371 return nil, errors.New("nats: missing timestamp header")
1372 }
1373 // Temporary code: the server in main branch is sending with format
1374 // "2006-01-02 15:04:05.999999999 +0000 UTC", but will be changed
1375 // to use format RFC3339Nano. Because of server test deps/cycle,
1376 // support both until the server PR lands.
1377 tm, err := time.Parse(time.RFC3339Nano, timeStr)
1378 if err != nil {
1379 tm, err = time.Parse("2006-01-02 15:04:05.999999999 +0000 UTC", timeStr)
1380 if err != nil {
1381 return nil, fmt.Errorf("nats: invalid timestamp header '%s': %v", timeStr, err)
1382 }
1383 }
1384 subj := r.Header.Get(JSSubject)
1385 if subj == _EMPTY_ {
1386 return nil, errors.New("nats: missing subject header")
1387 }
1388 return &RawStreamMsg{

Callers 2

getMsgMethod · 0.70

Calls 2

ErrorfMethod · 0.80
GetMethod · 0.65