()
| 536 | } |
| 537 | |
| 538 | func (s *service) wrapConnectionEventCallbacks() { |
| 539 | s.m.Lock() |
| 540 | defer s.m.Unlock() |
| 541 | s.natsHandlers.closed = s.nc.ClosedHandler() |
| 542 | if s.natsHandlers.closed != nil { |
| 543 | s.nc.SetClosedHandler(func(c *nats.Conn) { |
| 544 | s.Stop() |
| 545 | s.natsHandlers.closed(c) |
| 546 | }) |
| 547 | } else { |
| 548 | s.nc.SetClosedHandler(func(c *nats.Conn) { |
| 549 | s.Stop() |
| 550 | }) |
| 551 | } |
| 552 | |
| 553 | s.natsHandlers.asyncErr = s.nc.ErrorHandler() |
| 554 | if s.natsHandlers.asyncErr != nil { |
| 555 | s.nc.SetErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { |
| 556 | if sub == nil { |
| 557 | s.natsHandlers.asyncErr(c, sub, err) |
| 558 | return |
| 559 | } |
| 560 | endpoint, match := s.matchSubscriptionSubject(sub.Subject) |
| 561 | if !match { |
| 562 | s.natsHandlers.asyncErr(c, sub, err) |
| 563 | return |
| 564 | } |
| 565 | if s.Config.ErrorHandler != nil { |
| 566 | s.Config.ErrorHandler(s, &NATSError{ |
| 567 | Subject: sub.Subject, |
| 568 | Description: err.Error(), |
| 569 | err: err, |
| 570 | }) |
| 571 | } |
| 572 | s.m.Lock() |
| 573 | if endpoint != nil { |
| 574 | endpoint.stats.NumErrors++ |
| 575 | endpoint.stats.LastError = err.Error() |
| 576 | } |
| 577 | s.m.Unlock() |
| 578 | if stopErr := s.Stop(); stopErr != nil { |
| 579 | s.natsHandlers.asyncErr(c, sub, errors.Join(err, fmt.Errorf("stopping service: %w", stopErr))) |
| 580 | } else { |
| 581 | s.natsHandlers.asyncErr(c, sub, err) |
| 582 | } |
| 583 | }) |
| 584 | } else { |
| 585 | s.nc.SetErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { |
| 586 | if sub == nil { |
| 587 | return |
| 588 | } |
| 589 | endpoint, match := s.matchSubscriptionSubject(sub.Subject) |
| 590 | if !match { |
| 591 | return |
| 592 | } |
| 593 | if s.Config.ErrorHandler != nil { |
| 594 | s.Config.ErrorHandler(s, &NATSError{ |
| 595 | Subject: sub.Subject, |
no test coverage detected