(t *testing.T)
| 1786 | } |
| 1787 | |
| 1788 | func TestOrderedConsumerInfo(t *testing.T) { |
| 1789 | srv := RunBasicJetStreamServer() |
| 1790 | defer shutdownJSServerAndRemoveStorage(t, srv) |
| 1791 | nc, err := nats.Connect(srv.ClientURL()) |
| 1792 | if err != nil { |
| 1793 | t.Fatalf("Unexpected error: %v", err) |
| 1794 | } |
| 1795 | |
| 1796 | js, err := jetstream.New(nc) |
| 1797 | if err != nil { |
| 1798 | t.Fatalf("Unexpected error: %v", err) |
| 1799 | } |
| 1800 | defer nc.Close() |
| 1801 | |
| 1802 | ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) |
| 1803 | defer cancel() |
| 1804 | s, err := js.CreateStream(ctx, jetstream.StreamConfig{Name: "foo", Subjects: []string{"FOO.*"}}) |
| 1805 | if err != nil { |
| 1806 | t.Fatalf("Unexpected error: %v", err) |
| 1807 | } |
| 1808 | c, err := js.OrderedConsumer(ctx, "foo", jetstream.OrderedConsumerConfig{}) |
| 1809 | if err != nil { |
| 1810 | t.Fatalf("Unexpected error: %v", err) |
| 1811 | } |
| 1812 | |
| 1813 | cc, err := c.Consume(func(msg jetstream.Msg) {}) |
| 1814 | if err != nil { |
| 1815 | t.Fatalf("Unexpected error: %s", err) |
| 1816 | } |
| 1817 | defer cc.Stop() |
| 1818 | |
| 1819 | info, err := c.Info(ctx) |
| 1820 | if err != nil { |
| 1821 | t.Fatalf("Unexpected error: %s", err) |
| 1822 | } |
| 1823 | initialName := info.Name |
| 1824 | |
| 1825 | if err := s.DeleteConsumer(ctx, initialName); err != nil { |
| 1826 | t.Fatalf("Unexpected error: %s", err) |
| 1827 | } |
| 1828 | time.Sleep(50 * time.Millisecond) |
| 1829 | |
| 1830 | info, err = c.Info(ctx) |
| 1831 | if err != nil { |
| 1832 | t.Fatalf("Unexpected error: %s", err) |
| 1833 | } |
| 1834 | if info.Name == initialName { |
| 1835 | t.Fatalf("New consumer should be returned; got: %s", info.Name) |
| 1836 | } |
| 1837 | } |
| 1838 | |
| 1839 | func TestOrderedConsumerNextMaxWait(t *testing.T) { |
| 1840 | srv := RunBasicJetStreamServer() |
nothing calls this directly
no test coverage detected