MCPcopy
hub / github.com/nats-io/nats.go / TestAsyncSubscriptionPending

Function TestAsyncSubscriptionPending

test/sub_test.go:1129–1214  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

1127}
1128
1129func TestAsyncSubscriptionPending(t *testing.T) {
1130 s := RunDefaultServer()
1131 defer s.Shutdown()
1132
1133 nc := NewDefaultConnection(t)
1134 defer nc.Close()
1135
1136 // Send some messages to ourselves.
1137 total := 100
1138 msg := []byte("0123456789")
1139
1140 inCb := make(chan bool)
1141 block := make(chan bool)
1142
1143 sub, _ := nc.Subscribe("foo", func(m *nats.Msg) {
1144 inCb <- true
1145 <-block
1146 // Avoid repeated calls to this callback
1147 m.Sub.Unsubscribe()
1148 })
1149 defer sub.Unsubscribe()
1150
1151 for range total {
1152 nc.Publish("foo", msg)
1153 }
1154 nc.Flush()
1155
1156 // Wait that a message is received, so checks are safe
1157 if err := Wait(inCb); err != nil {
1158 t.Fatal("No message received")
1159 }
1160
1161 // Test old way
1162 q, _, _ := sub.Pending()
1163 if q != total && q != total-1 {
1164 t.Fatalf("Expected %d or %d, got %d", total, total-1, q)
1165 }
1166
1167 // New way, make sure the same and check bytes.
1168 m, b, _ := sub.Pending()
1169 mlen := len(msg)
1170 totalSize := total * mlen
1171
1172 if m != total && m != total-1 {
1173 t.Fatalf("Expected msgs of %d or %d, got %d", total, total-1, m)
1174 }
1175 if b != totalSize && b != totalSize-mlen {
1176 t.Fatalf("Expected bytes of %d or %d, got %d",
1177 totalSize, totalSize-mlen, b)
1178 }
1179
1180 // Make sure max has been set. Since we block after the first message is
1181 // received, MaxPending should be >= total - 1 and <= total
1182 mm, bm, _ := sub.MaxPending()
1183 if mm < total-1 || mm > total {
1184 t.Fatalf("Expected max msgs (%d) to be between %d and %d",
1185 mm, total-1, total)
1186 }

Callers

nothing calls this directly

Calls 12

UnsubscribeMethod · 0.80
PendingMethod · 0.80
FatalfMethod · 0.80
MaxPendingMethod · 0.80
ClearMaxPendingMethod · 0.80
RunDefaultServerFunction · 0.70
NewDefaultConnectionFunction · 0.70
WaitFunction · 0.70
SubscribeMethod · 0.65
PublishMethod · 0.65
CloseMethod · 0.45
FlushMethod · 0.45

Tested by

no test coverage detected