MCPcopy
hub / github.com/nats-io/nats.go / TestAsyncSubscriptionPendingDrain

Function TestAsyncSubscriptionPendingDrain

test/sub_test.go:1216–1254  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1214}
1215
1216func TestAsyncSubscriptionPendingDrain(t *testing.T) {
1217 s := RunDefaultServer()
1218 defer s.Shutdown()
1219
1220 nc := NewDefaultConnection(t)
1221 defer nc.Close()
1222
1223 // Send some messages to ourselves.
1224 total := 100
1225 msg := []byte("0123456789")
1226
1227 sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) {})
1228 defer sub.Unsubscribe()
1229
1230 for range total {
1231 nc.Publish("foo", msg)
1232 }
1233 nc.Flush()
1234
1235 // Wait for all delivered.
1236 waitFor(t, 2*time.Second, 15*time.Millisecond, func() error {
1237 if d, _ := sub.Delivered(); d != int64(total) {
1238 return fmt.Errorf("Wrong delivered count: %v vs %v", d, total)
1239 }
1240 m, b, _ := sub.Pending()
1241 if m != 0 {
1242 return fmt.Errorf("Expected msgs of 0, got %d", m)
1243 }
1244 if b != 0 {
1245 return fmt.Errorf("Expected bytes of 0, got %d", b)
1246 }
1247 return nil
1248 })
1249
1250 sub.Unsubscribe()
1251 if _, err := sub.Delivered(); err == nil {
1252 t.Fatal("Calling Delivered() on closed subscription should fail")
1253 }
1254}
1255
1256func TestSyncSubscriptionPendingDrain(t *testing.T) {
1257 s := RunDefaultServer()

Callers

nothing calls this directly

Calls 11

waitForFunction · 0.85
UnsubscribeMethod · 0.80
DeliveredMethod · 0.80
ErrorfMethod · 0.80
PendingMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected