(nc *nats.Conn, startwg, donewg *sync.WaitGroup, numMsgs int, msgSize int)
| 171 | } |
| 172 | |
| 173 | func runSubscriber(nc *nats.Conn, startwg, donewg *sync.WaitGroup, numMsgs int, msgSize int) { |
| 174 | args := flag.Args() |
| 175 | subj := args[0] |
| 176 | |
| 177 | received := 0 |
| 178 | ch := make(chan time.Time, 2) |
| 179 | sub, _ := nc.Subscribe(subj, func(msg *nats.Msg) { |
| 180 | received++ |
| 181 | if received == 1 { |
| 182 | ch <- time.Now() |
| 183 | } |
| 184 | if received >= numMsgs { |
| 185 | ch <- time.Now() |
| 186 | } |
| 187 | }) |
| 188 | sub.SetPendingLimits(-1, -1) |
| 189 | nc.Flush() |
| 190 | startwg.Done() |
| 191 | |
| 192 | start := <-ch |
| 193 | end := <-ch |
| 194 | benchmark.AddSubSample(bench.NewSample(numMsgs, msgSize, start, end, nc)) |
| 195 | nc.Close() |
| 196 | donewg.Done() |
| 197 | } |
no test coverage detected