Skip to content

Commit

Permalink
[ADDED] Options() and Conn() methods to JetStream (#1792)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Feb 4, 2025
1 parent 16e780e commit faec055
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 152 deletions.
19 changes: 10 additions & 9 deletions jetstream/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,13 @@ func (js *jetStream) apiRequestJSON(ctx context.Context, subject string, resp an

// a RequestWithContext with tracing via TraceCB
func (js *jetStream) apiRequest(ctx context.Context, subj string, data ...[]byte) (*jetStreamMsg, error) {
subj = js.apiSubject(subj)
var req []byte
if len(data) > 0 {
req = data[0]
}
if js.clientTrace != nil {
ctrace := js.clientTrace
if js.opts.clientTrace != nil {
ctrace := js.opts.clientTrace
if ctrace.RequestSent != nil {
ctrace.RequestSent(subj, req)
}
Expand All @@ -130,8 +131,8 @@ func (js *jetStream) apiRequest(ctx context.Context, subj string, data ...[]byte
if err != nil {
return nil, err
}
if js.clientTrace != nil {
ctrace := js.clientTrace
if js.opts.clientTrace != nil {
ctrace := js.opts.clientTrace
if ctrace.ResponseReceived != nil {
ctrace.ResponseReceived(subj, resp.Data, resp.Header)
}
Expand All @@ -140,12 +141,12 @@ func (js *jetStream) apiRequest(ctx context.Context, subj string, data ...[]byte
return js.toJSMsg(resp), nil
}

func apiSubj(prefix, subject string) string {
if prefix == "" {
return subject
func (js *jetStream) apiSubject(subj string) string {
if js.opts.apiPrefix == "" {
return subj
}
var b strings.Builder
b.WriteString(prefix)
b.WriteString(subject)
b.WriteString(js.opts.apiPrefix)
b.WriteString(subj)
return b.String()
}
44 changes: 22 additions & 22 deletions jetstream/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ type (

// Info fetches current ConsumerInfo from the server.
func (p *pullConsumer) Info(ctx context.Context) (*ConsumerInfo, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
ctx, cancel := p.js.wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
infoSubject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiConsumerInfoT, p.stream, p.name))
infoSubject := fmt.Sprintf(apiConsumerInfoT, p.stream, p.name)
var resp consumerInfoResponse

if _, err := p.jetStream.apiRequestJSON(ctx, infoSubject, &resp); err != nil {
if _, err := p.js.apiRequestJSON(ctx, infoSubject, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
Expand All @@ -187,7 +187,7 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo {
}

func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (Consumer, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
ctx, cancel := js.wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
Expand Down Expand Up @@ -218,9 +218,9 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
if err := validateSubject(cfg.FilterSubject); err != nil {
return nil, err
}
ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject))
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
} else {
ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateT, stream, consumerName))
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
}
var resp consumerInfoResponse

Expand All @@ -240,12 +240,12 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
}

return &pullConsumer{
jetStream: js,
stream: stream,
name: resp.Name,
durable: cfg.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
js: js,
stream: stream,
name: resp.Name,
durable: cfg.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
}, nil
}

Expand All @@ -267,14 +267,14 @@ func generateConsName() string {
}

func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consumer, error) {
ctx, cancel := wrapContextWithoutDeadline(ctx)
ctx, cancel := js.wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateConsumerName(name); err != nil {
return nil, err
}
infoSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerInfoT, stream, name))
infoSubject := fmt.Sprintf(apiConsumerInfoT, stream, name)

var resp consumerInfoResponse

Expand All @@ -292,26 +292,26 @@ func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consu
}

cons := &pullConsumer{
jetStream: js,
stream: stream,
name: name,
durable: resp.Config.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
js: js,
stream: stream,
name: name,
durable: resp.Config.Durable != "",
info: resp.ConsumerInfo,
subs: syncx.Map[string, *pullSubscription]{},
}

return cons, nil
}

func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string) error {
ctx, cancel := wrapContextWithoutDeadline(ctx)
ctx, cancel := js.wrapContextWithoutDeadline(ctx)
if cancel != nil {
defer cancel()
}
if err := validateConsumerName(consumer); err != nil {
return err
}
deleteSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerDeleteT, stream, consumer))
deleteSubject := fmt.Sprintf(apiConsumerDeleteT, stream, consumer)

var resp consumerDeleteResponse

Expand Down
Loading

0 comments on commit faec055

Please sign in to comment.