From 05b9311b3a8e4c36369ccf5d1a6ef08ce961f52e Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 30 Jan 2025 13:28:10 +0100 Subject: [PATCH] Update readme for jetstream Signed-off-by: Piotr Piotrowski --- jetstream/README.md | 117 ++++++++++++++++++++++++++++++++------------ 1 file changed, 86 insertions(+), 31 deletions(-) diff --git a/jetstream/README.md b/jetstream/README.md index 28926a842..d7306f286 100644 --- a/jetstream/README.md +++ b/jetstream/README.md @@ -6,33 +6,30 @@ This doc covers the basic usage of the `jetstream` package in `nats.go` client. - [Overview](#overview) - [Basic usage](#basic-usage) - [Streams](#streams) -- [Stream management (CRUD)](#stream-management-crud) -- [Listing streams and stream names](#listing-streams-and-stream-names) -- [Stream-specific operations](#stream-specific-operations) + - [Stream management (CRUD)](#stream-management-crud) + - [Listing streams and stream names](#listing-streams-and-stream-names) + - [Stream-specific operations](#stream-specific-operations) - [Consumers](#consumers) -- [Consumers management](#consumers-management) -- [Listing consumers and consumer - names](#listing-consumers-and-consumer-names) -- [Ordered consumers](#ordered-consumers) -- [Receiving messages from the - consumer](#receiving-messages-from-the-consumer) - - [Single fetch](#single-fetch) - - [Continuous polling](#continuous-polling) - - [Using `Consume()` receive messages in a - callback](#using-consume-receive-messages-in-a-callback) - - [Using `Messages()` to iterate over incoming - messages](#using-messages-to-iterate-over-incoming-messages) + - [Consumers management](#consumers-management) + - [Listing consumers and consumer names](#listing-consumers-and-consumer-names) + - [Ordered consumers](#ordered-consumers) + - [Receiving messages from pull consumers](#receiving-messages-from-pull-consumers) + - [Single fetch](#single-fetch) + - [Continuous polling](#continuous-polling) + - [Using `Consume()` receive messages in a callback](#using-consume-receive-messages-in-a-callback) + - [Using `Messages()` to iterate over incoming messages](#using-messages-to-iterate-over-incoming-messages) + - [Receiving messages from push consumers](#receiving-messages-from-push-consumers) - [Publishing on stream](#publishing-on-stream) -- [Synchronous publish](#synchronous-publish) -- [Async publish](#async-publish) + - [Synchronous publish](#synchronous-publish) + - [Async publish](#async-publish) - [KeyValue Store](#keyvalue-store) -- [Basic usage of KV bucket](#basic-usage-of-kv-bucket) -- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket) -- [Additional operations on a bucket](#additional-operations-on-a-bucket) + - [Basic usage of KV bucket](#basic-usage-of-kv-bucket) + - [Watching for changes on a bucket](#watching-for-changes-on-a-bucket) + - [Additional operations on a bucket](#additional-operations-on-a-bucket) - [Object Store](#object-store) -- [Basic usage of Object Store](#basic-usage-of-object-store) -- [Watching for changes on a store](#watching-for-changes-on-a-store) -- [Additional operations on a store](#additional-operations-on-a-store) + - [Basic usage of Object Store](#basic-usage-of-object-store) + - [Watching for changes on a store](#watching-for-changes-on-a-store) + - [Additional operations on a store](#additional-operations-on-a-store) - [Examples](#examples) ## Overview @@ -254,14 +251,34 @@ fmt.Println(cachedInfo.Config.Name) ## Consumers -Only pull consumers are supported in `jetstream` package. However, unlike the -JetStream API in `nats` package, pull consumers allow for continuous message -retrieval (similarly to how `nats.Subscribe()` works). Because of that, push -consumers can be easily replaced by pull consumers for most of the use cases. +Both pull and push consumers are supported in `jetstream` package. For most use +cases, we recommend using pull consumers as they allow for more fine-grained +control over the message processing and can often prevent issues such as e.g. +slow consumers. However, unlike the JetStream API in `nats` package, pull +consumers allow for continuous message retrieval (similarly to how +`nats.Subscribe()` works). Because of that, push consumers can be easily +replaced by pull consumers for most of the use cases. Push consumers are +supported mainly for the purpose of ease of migration from `nats` package. The +interfaces for consuming messages via push and pull consumers are similar, with +the main difference being that push consumers do not support fetching individual +batches of messages. ### Consumers management -CRUD operations on consumers can be achieved on 2 levels: +Both pull and push consumers can be managed using `jetstream` package. The +following example demonstrates how to create, update, fetch and delete a pull +consumer. Push consumers can be managed in a similar way, with method names +containing `Push` (e.g. `CreatePushConsumer`, `UpdatePushConsumer`, +`DeletePushConsumer`). + +> __NOTE__: It is important to use `CreateConsumer` and `CreatePushConsumer` +methods to create the respective consumer types as they return the correct +interface (different for push and pull consumers). `DeliverSubject` is mandatory +when creating a push consumer and cannot be provided when creating a pull +consumer. Similarly, an attempt to get a push consumer using `Consumer` method +will result in an error (and vice versa). + +CRUD operations on pull consumers can be achieved on 2 levels: - on `JetStream` interface @@ -370,6 +387,8 @@ message ordering. It is also resilient to consumer deletion. Ordered consumers present the same set of message consumption methods as standard pull consumers. +> __NOTE__: Ordered consumers are not supported for push consumers. + ```go js, _ := jetstream.New(nc) @@ -380,7 +399,7 @@ cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{ }) ``` -### Receiving messages from the consumer +### Receiving messages from pull consumers The `Consumer` interface covers allows fetching messages on demand, with pre-defined batch size on bytes limit, or continuous push-like receiving of @@ -469,10 +488,12 @@ cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{ AckPolicy: jetstream.AckExplicitPolicy, // receive messages from ORDERS.A subject only FilterSubject: "ORDERS.A" -})) +}) consContext, _ := c.Consume(func(msg jetstream.Msg) { fmt.Printf("Received a JetStream message: %s\n", string(msg.Data())) + // messages are not acknowledged automatically + msg.Ack() }) defer consContext.Stop() ``` @@ -491,7 +512,7 @@ type PullThresholdMessages int buffer - `PullHeartbeat(time.Duration)` - idle heartbeat duration for a single pull request. An error will be triggered if at least 2 heartbeats are missed -- `WithConsumeErrHandler(func (ConsumeContext, error))` - when used, sets a +- `ConsumeErrHandler(func (ConsumeContext, error))` - when used, sets a custom error handler on `Consume()`, allowing e.g. tracking missing heartbeats. @@ -565,6 +586,40 @@ for { } ``` +#### Receiving messages from push consumers + +The `PushConsumer` interface currently only allows message processing in a +callback using `Consume()`. + +As heartbeat for push consumers is not managed when using `Consume()`, it is +important to set `IdleHeartbeat` on the consumer level. Similarly, `FlowControl` +can be set to prevent the consumer from receiving more messages than it can +handle. + +```go +cons, _ := js.CreateOrUpdatePushConsumer("ORDERS", jetstream.ConsumerConfig{ + DeliverSubject: nats.NewInbox() + AckPolicy: jetstream.AckExplicitPolicy, + // receive messages from ORDERS.A subject only + FilterSubject: "ORDERS.A", + // unlike pull consumers, idle heartbeat is configured on the consumer level + IdleHeartbeat: 30 * time.Second +}) + +consContext, _ := c.Consume(func(msg jetstream.Msg) { + fmt.Printf("Received a JetStream message: %s\n", string(msg.Data())) + // messages are not acknowledged automatically + msg.Ack() +}) +defer consContext.Stop() +``` + +`Consume()` on `PushConsumer` can be supplied with `ConsumeErrHandler` option +to set a custom error handler allowing e.g. tracking missing heartbeats. + +> __NOTE__: `Stop()` should always be called on `ConsumeContext` to avoid +> leaking goroutines. + ## Publishing on stream `JetStream` interface allows publishing messages on stream in 2 ways: