Returns if the given message is a user message or not, and if `checkSts` is true, returns appropriate error based on the content of the status (404, etc..)
(msg *Msg, checkSts, isNoWait bool)
| 2903 | // `checkSts` is true, returns appropriate error based on the |
| 2904 | // content of the status (404, etc..) |
| 2905 | func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) { |
| 2906 | // Assume user message |
| 2907 | usrMsg = true |
| 2908 | |
| 2909 | // If payload or no header, consider this a user message |
| 2910 | if len(msg.Data) > 0 || len(msg.Header) == 0 { |
| 2911 | return |
| 2912 | } |
| 2913 | // Look for status header |
| 2914 | val := msg.Header.Get(statusHdr) |
| 2915 | // If not present, then this is considered a user message |
| 2916 | if val == _EMPTY_ { |
| 2917 | return |
| 2918 | } |
| 2919 | // At this point, this is not a user message since there is |
| 2920 | // no payload and a "Status" header. |
| 2921 | usrMsg = false |
| 2922 | |
| 2923 | // If we don't care about status, we are done. |
| 2924 | if !checkSts { |
| 2925 | return |
| 2926 | } |
| 2927 | |
| 2928 | // if it's a heartbeat message, report as not user msg |
| 2929 | if isHb, _ := isJSControlMessage(msg); isHb { |
| 2930 | return |
| 2931 | } |
| 2932 | switch val { |
| 2933 | case noResponders: |
| 2934 | err = ErrNoResponders |
| 2935 | case noMessagesSts: |
| 2936 | // 404 indicates that there are no messages. |
| 2937 | err = errNoMessages |
| 2938 | case reqTimeoutSts: |
| 2939 | // In case of a fetch request with no wait request and expires time, |
| 2940 | // need to skip 408 errors and retry. |
| 2941 | if isNoWait { |
| 2942 | err = errRequestsPending |
| 2943 | } else { |
| 2944 | // Older servers may send a 408 when a request in the server was expired |
| 2945 | // and interest is still found, which will be the case for our |
| 2946 | // implementation. Regardless, ignore 408 errors until receiving at least |
| 2947 | // one message when making requests without no_wait. |
| 2948 | err = ErrTimeout |
| 2949 | } |
| 2950 | case jetStream409Sts: |
| 2951 | if strings.Contains(strings.ToLower(msg.Header.Get(descrHdr)), "consumer deleted") { |
| 2952 | err = ErrConsumerDeleted |
| 2953 | break |
| 2954 | } |
| 2955 | |
| 2956 | if strings.Contains(strings.ToLower(msg.Header.Get(descrHdr)), "leadership change") { |
| 2957 | err = ErrConsumerLeadershipChanged |
| 2958 | break |
| 2959 | } |
| 2960 | fallthrough |
| 2961 | default: |
| 2962 | err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) |
no test coverage detected