MCPcopy
hub / github.com/nats-io/nats.go / ExampleJetStream

Function ExampleJetStream

example_test.go:230–328  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

228}
229
230func ExampleJetStream() {
231 nc, err := nats.Connect("localhost")
232 if err != nil {
233 log.Fatal(err)
234 }
235
236 // Use the JetStream context to produce and consumer messages
237 // that have been persisted.
238 js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
239 if err != nil {
240 log.Fatal(err)
241 }
242
243 js.AddStream(&nats.StreamConfig{
244 Name: "FOO",
245 Subjects: []string{"foo"},
246 })
247
248 js.Publish("foo", []byte("Hello JS!"))
249
250 // Publish messages asynchronously.
251 for i := 0; i < 500; i++ {
252 js.PublishAsync("foo", []byte("Hello JS Async!"))
253 }
254 select {
255 case <-js.PublishAsyncComplete():
256 case <-time.After(5 * time.Second):
257 fmt.Println("Did not resolve in time")
258 }
259
260 // Create async consumer on subject 'foo'. Async subscribers
261 // ack a message once exiting the callback.
262 js.Subscribe("foo", func(msg *nats.Msg) {
263 meta, _ := msg.Metadata()
264 fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream)
265 fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer)
266 })
267
268 // Async subscriber with manual acks.
269 js.Subscribe("foo", func(msg *nats.Msg) {
270 msg.Ack()
271 }, nats.ManualAck())
272
273 // Async queue subscription where members load balance the
274 // received messages together.
275 // If no consumer name is specified, either with nats.Bind()
276 // or nats.Durable() options, the queue name is used as the
277 // durable name (that is, as if you were passing the
278 // nats.Durable(<queue group name>) option.
279 // It is recommended to use nats.Bind() or nats.Durable()
280 // and preferably create the JetStream consumer beforehand
281 // (using js.AddConsumer) so that the JS consumer is not
282 // deleted on an Unsubscribe() or Drain() when the member
283 // that created the consumer goes away first.
284 // Check Godoc for the QueueSubscribe() API for more details.
285 js.QueueSubscribe("foo", "group", func(msg *nats.Msg) {
286 msg.Ack()
287 }, nats.ManualAck())

Callers

nothing calls this directly

Calls 15

ConnectMethod · 0.80
JetStreamMethod · 0.80
NextMsgMethod · 0.80
AddStreamMethod · 0.65
PublishMethod · 0.65
PublishAsyncMethod · 0.65
PublishAsyncCompleteMethod · 0.65
SubscribeMethod · 0.65
MetadataMethod · 0.65
AckMethod · 0.65
QueueSubscribeMethod · 0.65
SubscribeSyncMethod · 0.65

Tested by

no test coverage detected