MCPcopy
hub / github.com/nats-io/nats.go / publish

Method publish

nats.go:4424–4537  ·  nats.go::Conn.publish

publish is the internal function to publish messages to a nats-server. Sends a protocol data message by queuing into the bufio writer and kicking the flush go routine. These writes should be protected.

(subj, reply string, validateReply bool, hdr, data []byte)

Source from the content-addressed store, hash-verified

4422// Sends a protocol data message by queuing into the bufio writer
4423// and kicking the flush go routine. These writes should be protected.
4424func (nc *Conn) publish(subj, reply string, validateReply bool, hdr, data []byte) error {
4425 if nc == nil {
4426 return ErrInvalidConnection
4427 }
4428 if !nc.Opts.SkipSubjectValidation {
4429 if err := validateSubject(subj); err != nil {
4430 return err
4431 }
4432 if validateReply {
4433 if err := validateSubject(reply); err != nil {
4434 return ErrBadSubject
4435 }
4436 }
4437 } else if subj == _EMPTY_ {
4438 return ErrBadSubject
4439 }
4440 nc.mu.Lock()
4441
4442 // Check if headers attempted to be sent to server that does not support them.
4443 if len(hdr) > 0 && !nc.info.Headers {
4444 nc.mu.Unlock()
4445 return ErrHeadersNotSupported
4446 }
4447
4448 if nc.isClosed() {
4449 nc.mu.Unlock()
4450 return ErrConnectionClosed
4451 }
4452
4453 if nc.isDrainingPubs() {
4454 nc.mu.Unlock()
4455 return ErrConnectionDraining
4456 }
4457
4458 // Proactively reject payloads over the threshold set by server.
4459 msgSize := int64(len(data) + len(hdr))
4460 // Skip this check if we are not yet connected (RetryOnFailedConnect)
4461 if !nc.initc && msgSize > nc.info.MaxPayload {
4462 nc.mu.Unlock()
4463 return ErrMaxPayload
4464 }
4465
4466 // Check if we are reconnecting, and if so check if
4467 // we have exceeded our reconnect outbound buffer limits.
4468 if nc.bw.atLimitIfUsingPending() {
4469 nc.mu.Unlock()
4470 return ErrReconnectBufExceeded
4471 }
4472
4473 var mh []byte
4474 if hdr != nil {
4475 mh = nc.scratch[:len(_HPUB_P_)]
4476 } else {
4477 mh = nc.scratch[1:len(_HPUB_P_)]
4478 }
4479 mh = append(mh, subj...)
4480 mh = append(mh, ' ')
4481 if reply != "" {

Callers 9

PublishMethod · 0.95
PublishMsgMethod · 0.95
PublishRequestMethod · 0.95
oldRequestMethod · 0.95
oldRequestWithContextMethod · 0.95
PublishMsgAsyncMethod · 0.80
PublishMethod · 0.80
PublishRequestMethod · 0.80

Calls 6

isClosedMethod · 0.95
isDrainingPubsMethod · 0.95
kickFlusherMethod · 0.95
atLimitIfUsingPendingMethod · 0.80
appendBufsMethod · 0.80
validateSubjectFunction · 0.70

Tested by

no test coverage detected