Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
piotrpio and Jarema authored Jan 29, 2025
1 parent 7a59b11 commit 326f2b4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
12 changes: 6 additions & 6 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,21 @@ type (
// If consumer does not exist, ErrConsumerNotFound is returned.
DeleteConsumer(ctx context.Context, stream string, consumer string) error

// CreateOrUpdateConsumer creates a push consumer on a given stream with
// CreateOrUpdatePushConsumer creates a push consumer on a given stream with
// given config. If consumer already exists, it will be updated (if
// possible). Consumer interface is returned, allowing to operate on a
// consumer (e.g. fetch messages).
CreateOrUpdatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error)

// CreateConsumer creates a push consumer on a given stream with given
// CreatePushConsumer creates a push consumer on a given stream with given
// config. If consumer already exists and the provided configuration
// differs from its configuration, ErrConsumerExists is returned. If the
// provided configuration is the same as the existing consumer, the
// existing consumer is returned. Consumer interface is returned,
// allowing to consume messages.
CreatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error)

// UpdateConsumer updates an existing push consumer. If consumer does not
// UpdatePushConsumer updates an existing push consumer. If consumer does not
// exist, ErrConsumerDoesNotExist is returned. Consumer interface is
// returned, allowing to consume messages.
UpdatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error)
Expand Down Expand Up @@ -808,7 +808,7 @@ func (js *jetStream) DeleteConsumer(ctx context.Context, stream string, name str
return deleteConsumer(ctx, js, stream, name)
}

// CreateOrUpdateConsumer creates a push consumer on a given stream with
// CreateOrUpdatePushConsumer creates a push consumer on a given stream with
// given config. If consumer already exists, it will be updated (if
// possible). Consumer interface is returned, allowing to consume messages.
func (js *jetStream) CreateOrUpdatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error) {
Expand All @@ -831,7 +831,7 @@ func (js *jetStream) CreatePushConsumer(ctx context.Context, stream string, cfg
return upsertPushConsumer(ctx, js, stream, cfg, consumerActionCreate)
}

// UpdateConsumer updates an existing push consumer. If consumer does not
// UpdatePushConsumer updates an existing push consumer. If consumer does not
// exist, ErrConsumerDoesNotExist is returned. Consumer interface is
// returned, allowing to consume messages.
func (js *jetStream) UpdatePushConsumer(ctx context.Context, stream string, cfg ConsumerConfig) (PushConsumer, error) {
Expand All @@ -841,7 +841,7 @@ func (js *jetStream) UpdatePushConsumer(ctx context.Context, stream string, cfg
return upsertPushConsumer(ctx, js, stream, cfg, consumerActionUpdate)
}

// Consumer returns an interface to an existing consumer, allowing processing
// PushConsumer returns an interface to an existing consumer, allowing processing
// of messages. If consumer does not exist, ErrConsumerNotFound is
// returned.
func (js *jetStream) PushConsumer(ctx context.Context, stream string, name string) (PushConsumer, error) {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ func (consumeOpts *consumeOpts) setDefaults(ordered bool) error {
}
}
if consumeOpts.Heartbeat > consumeOpts.Expires/2 {
return fmt.Errorf("%w: the value of Heartbeat must be less than 50%% of expiry", ErrInvalidOption)
return fmt.Errorf("%w: the value of Heartbeat must be less than 50% of expiry", ErrInvalidOption)

Check failure on line 1025 in jetstream/pull.go

View workflow job for this annotation

GitHub Actions / lint

fmt.Errorf format % o reads arg #2, but call has 1 arg
}
return nil
}
10 changes: 5 additions & 5 deletions jetstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type (
// channel of consumer names.
ConsumerNames(context.Context) ConsumerNameLister

// CreateOrUpdateConsumer creates a push consumer on a given stream with
// CreateOrUpdatePushConsumer creates a push consumer on a given stream with
// given config. If consumer already exists, it will be updated (if
// possible). Consumer interface is returned, allowing to operate on a
// consumer (e.g. fetch messages).
Expand All @@ -125,7 +125,7 @@ type (
// allowing to consume messages.
CreatePushConsumer(ctx context.Context, cfg ConsumerConfig) (PushConsumer, error)

// UpdateConsumer updates an existing push consumer. If consumer does not
// UpdatePushConsumer updates an existing push consumer. If consumer does not
// exist, ErrConsumerDoesNotExist is returned. Consumer interface is
// returned, allowing to consume messages.
UpdatePushConsumer(ctx context.Context, cfg ConsumerConfig) (PushConsumer, error)
Expand Down Expand Up @@ -289,15 +289,15 @@ func (s *stream) UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consum
return upsertPullConsumer(ctx, s.jetStream, s.name, cfg, consumerActionUpdate)
}

// CreateOrUpdateConsumer creates a consumer on a given stream with
// CreateOrUpdatePushConsumer creates a consumer on a given stream with
// given config. If consumer already exists, it will be updated (if
// possible). Consumer interface is returned, allowing to operate on a
// consumer (e.g. fetch messages).
func (s *stream) CreateOrUpdatePushConsumer(ctx context.Context, cfg ConsumerConfig) (PushConsumer, error) {
return upsertPushConsumer(ctx, s.jetStream, s.name, cfg, consumerActionCreateOrUpdate)
}

// CreateConsumer creates a consumer on a given stream with given
// CreatePushConsumer creates a consumer on a given stream with given
// config. If consumer already exists and the provided configuration
// differs from its configuration, ErrConsumerExists is returned. If the
// provided configuration is the same as the existing consumer, the
Expand All @@ -307,7 +307,7 @@ func (s *stream) CreatePushConsumer(ctx context.Context, cfg ConsumerConfig) (Pu
return upsertPushConsumer(ctx, s.jetStream, s.name, cfg, consumerActionCreate)
}

// UpdateConsumer updates an existing consumer. If consumer does not
// UpdatePushConsumer updates an existing consumer. If consumer does not
// exist, ErrConsumerDoesNotExist is returned. Consumer interface is
// returned, allowing to operate on a consumer (e.g. fetch messages).
func (s *stream) UpdatePushConsumer(ctx context.Context, cfg ConsumerConfig) (PushConsumer, error) {
Expand Down

0 comments on commit 326f2b4

Please sign in to comment.