publish is the internal function to publish messages to a nats-server. Sends a protocol data message by queuing into the bufio writer and kicking the flush go routine. These writes should be protected.
(subj, reply string, validateReply bool, hdr, data []byte)
| 4422 | // Sends a protocol data message by queuing into the bufio writer |
| 4423 | // and kicking the flush go routine. These writes should be protected. |
| 4424 | func (nc *Conn) publish(subj, reply string, validateReply bool, hdr, data []byte) error { |
| 4425 | if nc == nil { |
| 4426 | return ErrInvalidConnection |
| 4427 | } |
| 4428 | if !nc.Opts.SkipSubjectValidation { |
| 4429 | if err := validateSubject(subj); err != nil { |
| 4430 | return err |
| 4431 | } |
| 4432 | if validateReply { |
| 4433 | if err := validateSubject(reply); err != nil { |
| 4434 | return ErrBadSubject |
| 4435 | } |
| 4436 | } |
| 4437 | } else if subj == _EMPTY_ { |
| 4438 | return ErrBadSubject |
| 4439 | } |
| 4440 | nc.mu.Lock() |
| 4441 | |
| 4442 | // Check if headers attempted to be sent to server that does not support them. |
| 4443 | if len(hdr) > 0 && !nc.info.Headers { |
| 4444 | nc.mu.Unlock() |
| 4445 | return ErrHeadersNotSupported |
| 4446 | } |
| 4447 | |
| 4448 | if nc.isClosed() { |
| 4449 | nc.mu.Unlock() |
| 4450 | return ErrConnectionClosed |
| 4451 | } |
| 4452 | |
| 4453 | if nc.isDrainingPubs() { |
| 4454 | nc.mu.Unlock() |
| 4455 | return ErrConnectionDraining |
| 4456 | } |
| 4457 | |
| 4458 | // Proactively reject payloads over the threshold set by server. |
| 4459 | msgSize := int64(len(data) + len(hdr)) |
| 4460 | // Skip this check if we are not yet connected (RetryOnFailedConnect) |
| 4461 | if !nc.initc && msgSize > nc.info.MaxPayload { |
| 4462 | nc.mu.Unlock() |
| 4463 | return ErrMaxPayload |
| 4464 | } |
| 4465 | |
| 4466 | // Check if we are reconnecting, and if so check if |
| 4467 | // we have exceeded our reconnect outbound buffer limits. |
| 4468 | if nc.bw.atLimitIfUsingPending() { |
| 4469 | nc.mu.Unlock() |
| 4470 | return ErrReconnectBufExceeded |
| 4471 | } |
| 4472 | |
| 4473 | var mh []byte |
| 4474 | if hdr != nil { |
| 4475 | mh = nc.scratch[:len(_HPUB_P_)] |
| 4476 | } else { |
| 4477 | mh = nc.scratch[1:len(_HPUB_P_)] |
| 4478 | } |
| 4479 | mh = append(mh, subj...) |
| 4480 | mh = append(mh, ' ') |
| 4481 | if reply != "" { |
no test coverage detected