(t *testing.T)
| 228 | } |
| 229 | |
| 230 | func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) { |
| 231 | s := RunBasicJetStreamServer() |
| 232 | defer shutdownJSServerAndRemoveStorage(t, s) |
| 233 | |
| 234 | nc, js := jsClient(t, s) |
| 235 | defer nc.Close() |
| 236 | |
| 237 | var err error |
| 238 | |
| 239 | _, err = js.AddStream(&nats.StreamConfig{ |
| 240 | Name: "OBJECT", |
| 241 | Subjects: []string{"a"}, |
| 242 | Storage: nats.MemoryStorage, |
| 243 | }) |
| 244 | if err != nil { |
| 245 | t.Fatalf("Unexpected error: %v", err) |
| 246 | } |
| 247 | |
| 248 | count := int32(0) |
| 249 | sub, err := js.Subscribe("a", func(m *nats.Msg) { |
| 250 | atomic.AddInt32(&count, 1) |
| 251 | }, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond)) |
| 252 | if err != nil { |
| 253 | t.Fatalf("Unexpected error: %v", err) |
| 254 | } |
| 255 | |
| 256 | // Ask to auto-unsub after 10 messages. |
| 257 | sub.AutoUnsubscribe(10) |
| 258 | |
| 259 | // Set a message filter that will drop 1 message |
| 260 | dm := 0 |
| 261 | singleLoss := func(m *nats.Msg) *nats.Msg { |
| 262 | if m.Header.Get("data") != "" { |
| 263 | dm++ |
| 264 | if dm == 5 { |
| 265 | nc.RemoveMsgFilter("a") |
| 266 | return nil |
| 267 | } |
| 268 | } |
| 269 | return m |
| 270 | } |
| 271 | nc.AddMsgFilter("a", singleLoss) |
| 272 | |
| 273 | // Now produce 20 messages |
| 274 | for i := 0; i < 20; i++ { |
| 275 | msg := nats.NewMsg("a") |
| 276 | msg.Data = []byte(fmt.Sprintf("msg_%d", i+1)) |
| 277 | msg.Header.Set("data", "true") |
| 278 | js.PublishMsgAsync(msg) |
| 279 | } |
| 280 | |
| 281 | select { |
| 282 | case <-js.PublishAsyncComplete(): |
| 283 | case <-time.After(time.Second): |
| 284 | t.Fatalf("Did not receive completion signal") |
| 285 | } |
| 286 | |
| 287 | // Wait for the subscription to be marked as invalid |
nothing calls this directly
no test coverage detected