(t *testing.T)
| 7004 | } |
| 7005 | |
| 7006 | func TestJetStream_ClusterMultipleSubscribe(t *testing.T) { |
| 7007 | nodes := []int{1, 3} |
| 7008 | replicas := []int{1} |
| 7009 | |
| 7010 | for _, n := range nodes { |
| 7011 | for _, r := range replicas { |
| 7012 | if r > 1 && n == 1 { |
| 7013 | continue |
| 7014 | } |
| 7015 | |
| 7016 | t.Run(fmt.Sprintf("qsub n=%d r=%d", n, r), func(t *testing.T) { |
| 7017 | name := fmt.Sprintf("MSUB%d%d", n, r) |
| 7018 | stream := &nats.StreamConfig{ |
| 7019 | Name: name, |
| 7020 | Replicas: r, |
| 7021 | } |
| 7022 | withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleQueueSubscribe) |
| 7023 | }) |
| 7024 | |
| 7025 | t.Run(fmt.Sprintf("psub n=%d r=%d", n, r), func(t *testing.T) { |
| 7026 | name := fmt.Sprintf("PSUBN%d%d", n, r) |
| 7027 | stream := &nats.StreamConfig{ |
| 7028 | Name: name, |
| 7029 | Replicas: n, |
| 7030 | } |
| 7031 | withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultiplePullSubscribe) |
| 7032 | }) |
| 7033 | |
| 7034 | t.Run(fmt.Sprintf("psub n=%d r=%d multi fetch", n, r), func(t *testing.T) { |
| 7035 | name := fmt.Sprintf("PFSUBN%d%d", n, r) |
| 7036 | stream := &nats.StreamConfig{ |
| 7037 | Name: name, |
| 7038 | Replicas: n, |
| 7039 | } |
| 7040 | withJSClusterAndStream(t, name, n, stream, testJetStream_ClusterMultipleFetchPullSubscribe) |
| 7041 | }) |
| 7042 | } |
| 7043 | } |
| 7044 | } |
| 7045 | |
| 7046 | func testJetStream_ClusterMultipleQueueSubscribe(t *testing.T, subject string, srvs ...*jsServer) { |
| 7047 | srv := srvs[0] |
nothing calls this directly
no test coverage detected