| 1329 | } |
| 1330 | |
| 1331 | func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error) { |
| 1332 | // Check for 404/408. We would get a no-payload message and a "Status" header |
| 1333 | if len(r.Data) == 0 { |
| 1334 | val := r.Header.Get(statusHdr) |
| 1335 | if val != _EMPTY_ { |
| 1336 | switch val { |
| 1337 | case noMessagesSts: |
| 1338 | return nil, ErrMsgNotFound |
| 1339 | default: |
| 1340 | desc := r.Header.Get(descrHdr) |
| 1341 | if desc == _EMPTY_ { |
| 1342 | desc = "unable to get message" |
| 1343 | } |
| 1344 | return nil, fmt.Errorf("nats: %s", desc) |
| 1345 | } |
| 1346 | } |
| 1347 | } |
| 1348 | // Check for headers that give us the required information to |
| 1349 | // reconstruct the message. |
| 1350 | if len(r.Header) == 0 { |
| 1351 | return nil, errors.New("nats: response should have headers") |
| 1352 | } |
| 1353 | stream := r.Header.Get(JSStream) |
| 1354 | if stream == _EMPTY_ { |
| 1355 | return nil, errors.New("nats: missing stream header") |
| 1356 | } |
| 1357 | |
| 1358 | // Mirrors can now answer direct gets, so removing check for name equality. |
| 1359 | // TODO(dlc) - We could have server also have a header with origin and check that? |
| 1360 | |
| 1361 | seqStr := r.Header.Get(JSSequence) |
| 1362 | if seqStr == _EMPTY_ { |
| 1363 | return nil, errors.New("nats: missing sequence header") |
| 1364 | } |
| 1365 | seq, err := strconv.ParseUint(seqStr, 10, 64) |
| 1366 | if err != nil { |
| 1367 | return nil, fmt.Errorf("nats: invalid sequence header '%s': %v", seqStr, err) |
| 1368 | } |
| 1369 | timeStr := r.Header.Get(JSTimeStamp) |
| 1370 | if timeStr == _EMPTY_ { |
| 1371 | return nil, errors.New("nats: missing timestamp header") |
| 1372 | } |
| 1373 | // Temporary code: the server in main branch is sending with format |
| 1374 | // "2006-01-02 15:04:05.999999999 +0000 UTC", but will be changed |
| 1375 | // to use format RFC3339Nano. Because of server test deps/cycle, |
| 1376 | // support both until the server PR lands. |
| 1377 | tm, err := time.Parse(time.RFC3339Nano, timeStr) |
| 1378 | if err != nil { |
| 1379 | tm, err = time.Parse("2006-01-02 15:04:05.999999999 +0000 UTC", timeStr) |
| 1380 | if err != nil { |
| 1381 | return nil, fmt.Errorf("nats: invalid timestamp header '%s': %v", timeStr, err) |
| 1382 | } |
| 1383 | } |
| 1384 | subj := r.Header.Get(JSSubject) |
| 1385 | if subj == _EMPTY_ { |
| 1386 | return nil, errors.New("nats: missing subject header") |
| 1387 | } |
| 1388 | return &RawStreamMsg{ |