MCPcopy
hub / github.com/nats-io/nats.go / TestJetStreamOrderedConsumerWithAutoUnsub

Function TestJetStreamOrderedConsumerWithAutoUnsub

test/js_internal_test.go:230–325  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

228}
229
230func TestJetStreamOrderedConsumerWithAutoUnsub(t *testing.T) {
231 s := RunBasicJetStreamServer()
232 defer shutdownJSServerAndRemoveStorage(t, s)
233
234 nc, js := jsClient(t, s)
235 defer nc.Close()
236
237 var err error
238
239 _, err = js.AddStream(&nats.StreamConfig{
240 Name: "OBJECT",
241 Subjects: []string{"a"},
242 Storage: nats.MemoryStorage,
243 })
244 if err != nil {
245 t.Fatalf("Unexpected error: %v", err)
246 }
247
248 count := int32(0)
249 sub, err := js.Subscribe("a", func(m *nats.Msg) {
250 atomic.AddInt32(&count, 1)
251 }, nats.OrderedConsumer(), nats.IdleHeartbeat(250*time.Millisecond))
252 if err != nil {
253 t.Fatalf("Unexpected error: %v", err)
254 }
255
256 // Ask to auto-unsub after 10 messages.
257 sub.AutoUnsubscribe(10)
258
259 // Set a message filter that will drop 1 message
260 dm := 0
261 singleLoss := func(m *nats.Msg) *nats.Msg {
262 if m.Header.Get("data") != "" {
263 dm++
264 if dm == 5 {
265 nc.RemoveMsgFilter("a")
266 return nil
267 }
268 }
269 return m
270 }
271 nc.AddMsgFilter("a", singleLoss)
272
273 // Now produce 20 messages
274 for i := 0; i < 20; i++ {
275 msg := nats.NewMsg("a")
276 msg.Data = []byte(fmt.Sprintf("msg_%d", i+1))
277 msg.Header.Set("data", "true")
278 js.PublishMsgAsync(msg)
279 }
280
281 select {
282 case <-js.PublishAsyncComplete():
283 case <-time.After(time.Second):
284 t.Fatalf("Did not receive completion signal")
285 }
286
287 // Wait for the subscription to be marked as invalid

Callers

nothing calls this directly

Calls 15

FatalfMethod · 0.80
AutoUnsubscribeMethod · 0.80
RemoveMsgFilterMethod · 0.80
AddMsgFilterMethod · 0.80
IsValidMethod · 0.80
RunBasicJetStreamServerFunction · 0.70
jsClientFunction · 0.70
AddStreamMethod · 0.65
SubscribeMethod · 0.65
OrderedConsumerMethod · 0.65
GetMethod · 0.65

Tested by

no test coverage detected