Stop drains the endpoint subscriptions and marks the service as stopped.
()
| 706 | |
| 707 | // Stop drains the endpoint subscriptions and marks the service as stopped. |
| 708 | func (s *service) Stop() error { |
| 709 | s.m.Lock() |
| 710 | defer s.m.Unlock() |
| 711 | if s.stopped { |
| 712 | return nil |
| 713 | } |
| 714 | // make a copy of s.endpoints to range over in order to stop |
| 715 | // since *Endpoint.stop manipulates s.endpoints! |
| 716 | endpointsToStop := append(make([]*Endpoint, 0, len(s.endpoints)), s.endpoints...) |
| 717 | for _, e := range endpointsToStop { |
| 718 | if err := e.stop(); err != nil { |
| 719 | return err |
| 720 | } |
| 721 | } |
| 722 | var keys []string |
| 723 | for key, sub := range s.verbSubs { |
| 724 | keys = append(keys, key) |
| 725 | if err := sub.Drain(); err != nil { |
| 726 | // connection is closed so draining is not possible |
| 727 | if errors.Is(err, nats.ErrConnectionClosed) { |
| 728 | break |
| 729 | } |
| 730 | return fmt.Errorf("draining subscription for subject %q: %w", sub.Subject, err) |
| 731 | } |
| 732 | } |
| 733 | for _, key := range keys { |
| 734 | delete(s.verbSubs, key) |
| 735 | } |
| 736 | unwrapConnectionEventCallbacks(s.nc, s.natsHandlers) |
| 737 | s.stopped = true |
| 738 | if s.DoneHandler != nil { |
| 739 | s.asyncDispatcher.push(func() { s.DoneHandler(s) }) |
| 740 | } |
| 741 | s.asyncDispatcher.close() |
| 742 | return nil |
| 743 | } |
| 744 | |
| 745 | func (s *service) serviceIdentity() ServiceIdentity { |
| 746 | return ServiceIdentity{ |