Skip to content

Commit

Permalink
Update readme for jetstream
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Jan 30, 2025
1 parent 6a540ca commit 05b9311
Showing 1 changed file with 86 additions and 31 deletions.
117 changes: 86 additions & 31 deletions jetstream/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,30 @@ This doc covers the basic usage of the `jetstream` package in `nats.go` client.
- [Overview](#overview)
- [Basic usage](#basic-usage)
- [Streams](#streams)
- [Stream management (CRUD)](#stream-management-crud)
- [Listing streams and stream names](#listing-streams-and-stream-names)
- [Stream-specific operations](#stream-specific-operations)
- [Stream management (CRUD)](#stream-management-crud)
- [Listing streams and stream names](#listing-streams-and-stream-names)
- [Stream-specific operations](#stream-specific-operations)
- [Consumers](#consumers)
- [Consumers management](#consumers-management)
- [Listing consumers and consumer
names](#listing-consumers-and-consumer-names)
- [Ordered consumers](#ordered-consumers)
- [Receiving messages from the
consumer](#receiving-messages-from-the-consumer)
- [Single fetch](#single-fetch)
- [Continuous polling](#continuous-polling)
- [Using `Consume()` receive messages in a
callback](#using-consume-receive-messages-in-a-callback)
- [Using `Messages()` to iterate over incoming
messages](#using-messages-to-iterate-over-incoming-messages)
- [Consumers management](#consumers-management)
- [Listing consumers and consumer names](#listing-consumers-and-consumer-names)
- [Ordered consumers](#ordered-consumers)
- [Receiving messages from pull consumers](#receiving-messages-from-pull-consumers)
- [Single fetch](#single-fetch)
- [Continuous polling](#continuous-polling)
- [Using `Consume()` receive messages in a callback](#using-consume-receive-messages-in-a-callback)
- [Using `Messages()` to iterate over incoming messages](#using-messages-to-iterate-over-incoming-messages)
- [Receiving messages from push consumers](#receiving-messages-from-push-consumers)
- [Publishing on stream](#publishing-on-stream)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [Synchronous publish](#synchronous-publish)
- [Async publish](#async-publish)
- [KeyValue Store](#keyvalue-store)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Basic usage of KV bucket](#basic-usage-of-kv-bucket)
- [Watching for changes on a bucket](#watching-for-changes-on-a-bucket)
- [Additional operations on a bucket](#additional-operations-on-a-bucket)
- [Object Store](#object-store)
- [Basic usage of Object Store](#basic-usage-of-object-store)
- [Watching for changes on a store](#watching-for-changes-on-a-store)
- [Additional operations on a store](#additional-operations-on-a-store)
- [Basic usage of Object Store](#basic-usage-of-object-store)
- [Watching for changes on a store](#watching-for-changes-on-a-store)
- [Additional operations on a store](#additional-operations-on-a-store)
- [Examples](#examples)

## Overview
Expand Down Expand Up @@ -254,14 +251,34 @@ fmt.Println(cachedInfo.Config.Name)

## Consumers

Only pull consumers are supported in `jetstream` package. However, unlike the
JetStream API in `nats` package, pull consumers allow for continuous message
retrieval (similarly to how `nats.Subscribe()` works). Because of that, push
consumers can be easily replaced by pull consumers for most of the use cases.
Both pull and push consumers are supported in `jetstream` package. For most use
cases, we recommend using pull consumers as they allow for more fine-grained
control over the message processing and can often prevent issues such as e.g.
slow consumers. However, unlike the JetStream API in `nats` package, pull
consumers allow for continuous message retrieval (similarly to how
`nats.Subscribe()` works). Because of that, push consumers can be easily
replaced by pull consumers for most of the use cases. Push consumers are
supported mainly for the purpose of ease of migration from `nats` package. The
interfaces for consuming messages via push and pull consumers are similar, with
the main difference being that push consumers do not support fetching individual
batches of messages.

### Consumers management

CRUD operations on consumers can be achieved on 2 levels:
Both pull and push consumers can be managed using `jetstream` package. The
following example demonstrates how to create, update, fetch and delete a pull
consumer. Push consumers can be managed in a similar way, with method names
containing `Push` (e.g. `CreatePushConsumer`, `UpdatePushConsumer`,
`DeletePushConsumer`).

> __NOTE__: It is important to use `CreateConsumer` and `CreatePushConsumer`
methods to create the respective consumer types as they return the correct
interface (different for push and pull consumers). `DeliverSubject` is mandatory
when creating a push consumer and cannot be provided when creating a pull
consumer. Similarly, an attempt to get a push consumer using `Consumer` method
will result in an error (and vice versa).

CRUD operations on pull consumers can be achieved on 2 levels:

- on `JetStream` interface

Expand Down Expand Up @@ -370,6 +387,8 @@ message ordering. It is also resilient to consumer deletion.
Ordered consumers present the same set of message consumption methods as
standard pull consumers.

> __NOTE__: Ordered consumers are not supported for push consumers.
```go
js, _ := jetstream.New(nc)

Expand All @@ -380,7 +399,7 @@ cons, _ := js.OrderedConsumer(ctx, "ORDERS", jetstream.OrderedConsumerConfig{
})
```

### Receiving messages from the consumer
### Receiving messages from pull consumers

The `Consumer` interface covers allows fetching messages on demand, with
pre-defined batch size on bytes limit, or continuous push-like receiving of
Expand Down Expand Up @@ -469,10 +488,12 @@ cons, _ := js.CreateOrUpdateConsumer("ORDERS", jetstream.ConsumerConfig{
AckPolicy: jetstream.AckExplicitPolicy,
// receive messages from ORDERS.A subject only
FilterSubject: "ORDERS.A"
}))
})

consContext, _ := c.Consume(func(msg jetstream.Msg) {
fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
// messages are not acknowledged automatically
msg.Ack()
})
defer consContext.Stop()
```
Expand All @@ -491,7 +512,7 @@ type PullThresholdMessages int
buffer
- `PullHeartbeat(time.Duration)` - idle heartbeat duration for a single pull
request. An error will be triggered if at least 2 heartbeats are missed
- `WithConsumeErrHandler(func (ConsumeContext, error))` - when used, sets a
- `ConsumeErrHandler(func (ConsumeContext, error))` - when used, sets a
custom error handler on `Consume()`, allowing e.g. tracking missing
heartbeats.

Expand Down Expand Up @@ -565,6 +586,40 @@ for {
}
```

#### Receiving messages from push consumers

The `PushConsumer` interface currently only allows message processing in a
callback using `Consume()`.

As heartbeat for push consumers is not managed when using `Consume()`, it is
important to set `IdleHeartbeat` on the consumer level. Similarly, `FlowControl`
can be set to prevent the consumer from receiving more messages than it can
handle.

```go
cons, _ := js.CreateOrUpdatePushConsumer("ORDERS", jetstream.ConsumerConfig{
DeliverSubject: nats.NewInbox()
AckPolicy: jetstream.AckExplicitPolicy,
// receive messages from ORDERS.A subject only
FilterSubject: "ORDERS.A",
// unlike pull consumers, idle heartbeat is configured on the consumer level
IdleHeartbeat: 30 * time.Second
})

consContext, _ := c.Consume(func(msg jetstream.Msg) {
fmt.Printf("Received a JetStream message: %s\n", string(msg.Data()))
// messages are not acknowledged automatically
msg.Ack()
})
defer consContext.Stop()
```

`Consume()` on `PushConsumer` can be supplied with `ConsumeErrHandler` option
to set a custom error handler allowing e.g. tracking missing heartbeats.

> __NOTE__: `Stop()` should always be called on `ConsumeContext` to avoid
> leaking goroutines.
## Publishing on stream

`JetStream` interface allows publishing messages on stream in 2 ways:
Expand Down

0 comments on commit 05b9311

Please sign in to comment.