diff --git a/jetstream/api.go b/jetstream/api.go index 1cea088ed..a948cd4ec 100644 --- a/jetstream/api.go +++ b/jetstream/api.go @@ -116,12 +116,13 @@ func (js *jetStream) apiRequestJSON(ctx context.Context, subject string, resp an // a RequestWithContext with tracing via TraceCB func (js *jetStream) apiRequest(ctx context.Context, subj string, data ...[]byte) (*jetStreamMsg, error) { + subj = js.apiSubject(subj) var req []byte if len(data) > 0 { req = data[0] } - if js.clientTrace != nil { - ctrace := js.clientTrace + if js.opts.clientTrace != nil { + ctrace := js.opts.clientTrace if ctrace.RequestSent != nil { ctrace.RequestSent(subj, req) } @@ -130,8 +131,8 @@ func (js *jetStream) apiRequest(ctx context.Context, subj string, data ...[]byte if err != nil { return nil, err } - if js.clientTrace != nil { - ctrace := js.clientTrace + if js.opts.clientTrace != nil { + ctrace := js.opts.clientTrace if ctrace.ResponseReceived != nil { ctrace.ResponseReceived(subj, resp.Data, resp.Header) } @@ -140,12 +141,12 @@ func (js *jetStream) apiRequest(ctx context.Context, subj string, data ...[]byte return js.toJSMsg(resp), nil } -func apiSubj(prefix, subject string) string { - if prefix == "" { - return subject +func (js *jetStream) apiSubject(subj string) string { + if js.opts.apiPrefix == "" { + return subj } var b strings.Builder - b.WriteString(prefix) - b.WriteString(subject) + b.WriteString(js.opts.apiPrefix) + b.WriteString(subj) return b.String() } diff --git a/jetstream/consumer.go b/jetstream/consumer.go index 2a36c3ef6..794d02b88 100644 --- a/jetstream/consumer.go +++ b/jetstream/consumer.go @@ -155,14 +155,14 @@ type ( // Info fetches current ConsumerInfo from the server. func (p *pullConsumer) Info(ctx context.Context) (*ConsumerInfo, error) { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := p.js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } - infoSubject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiConsumerInfoT, p.stream, p.name)) + infoSubject := fmt.Sprintf(apiConsumerInfoT, p.stream, p.name) var resp consumerInfoResponse - if _, err := p.jetStream.apiRequestJSON(ctx, infoSubject, &resp); err != nil { + if _, err := p.js.apiRequestJSON(ctx, infoSubject, &resp); err != nil { return nil, err } if resp.Error != nil { @@ -187,7 +187,7 @@ func (p *pullConsumer) CachedInfo() *ConsumerInfo { } func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg ConsumerConfig, action string) (Consumer, error) { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -218,9 +218,9 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu if err := validateSubject(cfg.FilterSubject); err != nil { return nil, err } - ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)) + ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject) } else { - ccSubj = apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerCreateT, stream, consumerName)) + ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName) } var resp consumerInfoResponse @@ -240,12 +240,12 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu } return &pullConsumer{ - jetStream: js, - stream: stream, - name: resp.Name, - durable: cfg.Durable != "", - info: resp.ConsumerInfo, - subs: syncx.Map[string, *pullSubscription]{}, + js: js, + stream: stream, + name: resp.Name, + durable: cfg.Durable != "", + info: resp.ConsumerInfo, + subs: syncx.Map[string, *pullSubscription]{}, }, nil } @@ -267,14 +267,14 @@ func generateConsName() string { } func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consumer, error) { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } if err := validateConsumerName(name); err != nil { return nil, err } - infoSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerInfoT, stream, name)) + infoSubject := fmt.Sprintf(apiConsumerInfoT, stream, name) var resp consumerInfoResponse @@ -292,26 +292,26 @@ func getConsumer(ctx context.Context, js *jetStream, stream, name string) (Consu } cons := &pullConsumer{ - jetStream: js, - stream: stream, - name: name, - durable: resp.Config.Durable != "", - info: resp.ConsumerInfo, - subs: syncx.Map[string, *pullSubscription]{}, + js: js, + stream: stream, + name: name, + durable: resp.Config.Durable != "", + info: resp.ConsumerInfo, + subs: syncx.Map[string, *pullSubscription]{}, } return cons, nil } func deleteConsumer(ctx context.Context, js *jetStream, stream, consumer string) error { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } if err := validateConsumerName(consumer); err != nil { return err } - deleteSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiConsumerDeleteT, stream, consumer)) + deleteSubject := fmt.Sprintf(apiConsumerDeleteT, stream, consumer) var resp consumerDeleteResponse diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 7099271ef..7daa6fd73 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -49,6 +49,13 @@ type ( // returned. AccountInfo(ctx context.Context) (*AccountInfo, error) + // Conn returns the underlying NATS connection. + Conn() *nats.Conn + + // Options returns read-only JetStreamOptions used + // when making requests to JetStream. + Options() JetStreamOptions + StreamConsumerManager StreamManager Publisher @@ -274,17 +281,27 @@ type ( jetStream struct { conn *nats.Conn - jsOpts + opts JetStreamOptions publisher *jetStreamClient } // JetStreamOpt is a functional option for [New], [NewWithAPIPrefix] and // [NewWithDomain] methods. - JetStreamOpt func(*jsOpts) error + JetStreamOpt func(*JetStreamOptions) error + + // JetStreamOptions are used to configure JetStream. + JetStreamOptions struct { + // APIPrefix is the prefix used for JetStream API requests. + APIPrefix string + + // Domain is the domain name token used when sending JetStream requests. + Domain string + + publisherOpts asyncPublisherOpts - jsOpts struct { - publisherOpts asyncPublisherOpts + // this is the actual prefix used in the API requests + // it is either APIPrefix or a domain specific prefix apiPrefix string replyPrefix string replyPrefixLen int @@ -379,7 +396,7 @@ var subjectRegexp = regexp.MustCompile(`^[^ >]*[>]?$`) // - [WithPublishAsyncMaxPending] - sets the maximum outstanding async publishes // that can be inflight at one time. func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error) { - jsOpts := jsOpts{ + jsOpts := JetStreamOptions{ apiPrefix: DefaultAPIPrefix, publisherOpts: asyncPublisherOpts{ maxpa: defaultAsyncPubAckInflight, @@ -393,7 +410,7 @@ func New(nc *nats.Conn, opts ...JetStreamOpt) (JetStream, error) { } js := &jetStream{ conn: nc, - jsOpts: jsOpts, + opts: jsOpts, publisher: &jetStreamClient{asyncPublisherOpts: jsOpts.publisherOpts}, } @@ -405,7 +422,7 @@ const ( defaultAsyncPubAckInflight = 4000 ) -func setReplyPrefix(nc *nats.Conn, jsOpts *jsOpts) { +func setReplyPrefix(nc *nats.Conn, jsOpts *JetStreamOptions) { jsOpts.replyPrefix = nats.InboxPrefix if nc.Opts.InboxPrefix != "" { jsOpts.replyPrefix = nc.Opts.InboxPrefix + "." @@ -424,10 +441,11 @@ func setReplyPrefix(nc *nats.Conn, jsOpts *jsOpts) { // - [WithPublishAsyncMaxPending] - sets the maximum outstanding async publishes // that can be inflight at one time. func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (JetStream, error) { - jsOpts := jsOpts{ + jsOpts := JetStreamOptions{ publisherOpts: asyncPublisherOpts{ maxpa: defaultAsyncPubAckInflight, }, + APIPrefix: apiPrefix, } setReplyPrefix(nc, &jsOpts) for _, opt := range opts { @@ -440,10 +458,12 @@ func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (Je } if !strings.HasSuffix(apiPrefix, ".") { jsOpts.apiPrefix = fmt.Sprintf("%s.", apiPrefix) + } else { + jsOpts.apiPrefix = apiPrefix } js := &jetStream{ conn: nc, - jsOpts: jsOpts, + opts: jsOpts, publisher: &jetStreamClient{asyncPublisherOpts: jsOpts.publisherOpts}, } return js, nil @@ -458,10 +478,11 @@ func NewWithAPIPrefix(nc *nats.Conn, apiPrefix string, opts ...JetStreamOpt) (Je // - [WithPublishAsyncMaxPending] - sets the maximum outstanding async publishes // that can be inflight at one time. func NewWithDomain(nc *nats.Conn, domain string, opts ...JetStreamOpt) (JetStream, error) { - jsOpts := jsOpts{ + jsOpts := JetStreamOptions{ publisherOpts: asyncPublisherOpts{ maxpa: defaultAsyncPubAckInflight, }, + Domain: domain, } setReplyPrefix(nc, &jsOpts) for _, opt := range opts { @@ -475,12 +496,21 @@ func NewWithDomain(nc *nats.Conn, domain string, opts ...JetStreamOpt) (JetStrea jsOpts.apiPrefix = fmt.Sprintf(jsDomainT, domain) js := &jetStream{ conn: nc, - jsOpts: jsOpts, + opts: jsOpts, publisher: &jetStreamClient{asyncPublisherOpts: jsOpts.publisherOpts}, } return js, nil } +// Conn returns the underlying NATS connection. +func (js *jetStream) Conn() *nats.Conn { + return js.conn +} + +func (js *jetStream) Options() JetStreamOptions { + return js.opts +} + // CreateStream creates a new stream with given config and returns an // interface to operate on it. If stream with given name already exists, // ErrStreamNameAlreadyInUse is returned. @@ -488,7 +518,7 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream if err := validateStreamName(cfg.Name); err != nil { return nil, err } - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -520,7 +550,7 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream return nil, err } - createSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiStreamCreateT, cfg.Name)) + createSubject := fmt.Sprintf(apiStreamCreateT, cfg.Name) var resp streamInfoResponse if _, err = js.apiRequestJSON(ctx, createSubject, &resp, req); err != nil { @@ -550,9 +580,9 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream } return &stream{ - jetStream: js, - name: cfg.Name, - info: resp.StreamInfo, + js: js, + name: cfg.Name, + info: resp.StreamInfo, }, nil } @@ -590,7 +620,7 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream if err := validateStreamName(cfg.Name); err != nil { return nil, err } - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -600,7 +630,7 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream return nil, err } - updateSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiStreamUpdateT, cfg.Name)) + updateSubject := fmt.Sprintf(apiStreamUpdateT, cfg.Name) var resp streamInfoResponse if _, err = js.apiRequestJSON(ctx, updateSubject, &resp, req); err != nil { @@ -630,9 +660,9 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream } return &stream{ - jetStream: js, - name: cfg.Name, - info: resp.StreamInfo, + js: js, + name: cfg.Name, + info: resp.StreamInfo, }, nil } @@ -656,11 +686,11 @@ func (js *jetStream) Stream(ctx context.Context, name string) (Stream, error) { if err := validateStreamName(name); err != nil { return nil, err } - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } - infoSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiStreamInfoT, name)) + infoSubject := fmt.Sprintf(apiStreamInfoT, name) var resp streamInfoResponse @@ -674,9 +704,9 @@ func (js *jetStream) Stream(ctx context.Context, name string) (Stream, error) { return nil, resp.Error } return &stream{ - jetStream: js, - name: name, - info: resp.StreamInfo, + js: js, + name: name, + info: resp.StreamInfo, }, nil } @@ -685,11 +715,11 @@ func (js *jetStream) DeleteStream(ctx context.Context, name string) error { if err := validateStreamName(name); err != nil { return err } - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } - deleteSubject := apiSubj(js.apiPrefix, fmt.Sprintf(apiStreamDeleteT, name)) + deleteSubject := fmt.Sprintf(apiStreamDeleteT, name) var resp streamDeleteResponse if _, err := js.apiRequestJSON(ctx, deleteSubject, &resp); err != nil { @@ -747,7 +777,7 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord return nil, err } oc := &orderedConsumer{ - jetStream: js, + js: js, cfg: &cfg, stream: stream, namePrefix: nuid.Next(), @@ -810,14 +840,13 @@ func validateSubject(subject string) error { // returned (for a single server setup). For clustered topologies, AccountInfo // will time out. func (js *jetStream) AccountInfo(ctx context.Context) (*AccountInfo, error) { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } var resp accountInfoResponse - infoSubject := apiSubj(js.apiPrefix, apiAccountInfo) - if _, err := js.apiRequestJSON(ctx, infoSubject, &resp); err != nil { + if _, err := js.apiRequestJSON(ctx, apiAccountInfo, &resp); err != nil { if errors.Is(err, nats.ErrNoResponders) { return nil, ErrJetStreamNotEnabled } @@ -853,7 +882,7 @@ func (js *jetStream) ListStreams(ctx context.Context, opts ...StreamListOpt) Str } go func() { defer close(l.streams) - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -906,7 +935,7 @@ func (js *jetStream) StreamNames(ctx context.Context, opts ...StreamListOpt) Str } } go func() { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -938,14 +967,13 @@ func (js *jetStream) StreamNames(ctx context.Context, opts ...StreamListOpt) Str // subject. If no stream is bound to given subject, ErrStreamNotFound // is returned. func (js *jetStream) StreamNameBySubject(ctx context.Context, subject string) (string, error) { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } if err := validateSubject(subject); err != nil { return "", err } - streamsSubject := apiSubj(js.apiPrefix, apiStreams) r := &streamsRequest{Subject: subject} req, err := json.Marshal(r) @@ -953,7 +981,7 @@ func (js *jetStream) StreamNameBySubject(ctx context.Context, subject string) (s return "", err } var resp streamNamesResponse - _, err = js.apiRequestJSON(ctx, streamsSubject, &resp, req) + _, err = js.apiRequestJSON(ctx, apiStreams, &resp, req) if err != nil { return "", err } @@ -989,9 +1017,8 @@ func (s *streamLister) streamInfos(ctx context.Context, streamsReq streamsReques return nil, err } - slSubj := apiSubj(s.js.apiPrefix, apiStreamListT) var resp streamListResponse - _, err = s.js.apiRequestJSON(ctx, slSubj, &resp, reqJSON) + _, err = s.js.apiRequestJSON(ctx, apiStreamListT, &resp, reqJSON) if err != nil { return nil, err } @@ -1021,9 +1048,8 @@ func (s *streamLister) streamNames(ctx context.Context, streamsReq streamsReques return nil, err } - slSubj := apiSubj(s.js.apiPrefix, apiStreams) var resp streamNamesResponse - _, err = s.js.apiRequestJSON(ctx, slSubj, &resp, reqJSON) + _, err = s.js.apiRequestJSON(ctx, apiStreams, &resp, reqJSON) if err != nil { return nil, err } @@ -1039,7 +1065,7 @@ func (s *streamLister) streamNames(ctx context.Context, streamsReq streamsReques // wrapContextWithoutDeadline wraps context without deadline with default timeout. // If deadline is already set, it will be returned as is, and cancel() will be nil. // Caller should check if cancel() is nil before calling it. -func wrapContextWithoutDeadline(ctx context.Context) (context.Context, context.CancelFunc) { +func (js *jetStream) wrapContextWithoutDeadline(ctx context.Context) (context.Context, context.CancelFunc) { if _, ok := ctx.Deadline(); ok { return ctx, nil } diff --git a/jetstream/jetstream_options.go b/jetstream/jetstream_options.go index 60878d87e..78cd36c77 100644 --- a/jetstream/jetstream_options.go +++ b/jetstream/jetstream_options.go @@ -30,7 +30,7 @@ func (fn pullOptFunc) configureMessages(opts *consumeOpts) error { // WithClientTrace enables request/response API calls tracing. func WithClientTrace(ct *ClientTrace) JetStreamOpt { - return func(opts *jsOpts) error { + return func(opts *JetStreamOptions) error { opts.clientTrace = ct return nil } @@ -38,7 +38,7 @@ func WithClientTrace(ct *ClientTrace) JetStreamOpt { // WithPublishAsyncErrHandler sets error handler for async message publish. func WithPublishAsyncErrHandler(cb MsgErrHandler) JetStreamOpt { - return func(opts *jsOpts) error { + return func(opts *JetStreamOptions) error { opts.publisherOpts.aecb = cb return nil } @@ -47,7 +47,7 @@ func WithPublishAsyncErrHandler(cb MsgErrHandler) JetStreamOpt { // WithPublishAsyncMaxPending sets the maximum outstanding async publishes that // can be inflight at one time. func WithPublishAsyncMaxPending(max int) JetStreamOpt { - return func(opts *jsOpts) error { + return func(opts *JetStreamOptions) error { if max < 1 { return fmt.Errorf("%w: max ack pending should be >= 1", ErrInvalidOption) } diff --git a/jetstream/kv.go b/jetstream/kv.go index 152c7a457..a4bd3954a 100644 --- a/jetstream/kv.go +++ b/jetstream/kv.go @@ -790,13 +790,13 @@ func (kl *kvLister) Error() error { func (js *jetStream) legacyJetStream() (nats.JetStreamContext, error) { opts := make([]nats.JSOpt, 0) - if js.apiPrefix != "" { - opts = append(opts, nats.APIPrefix(js.apiPrefix)) + if js.opts.apiPrefix != "" { + opts = append(opts, nats.APIPrefix(js.opts.apiPrefix)) } - if js.clientTrace != nil { + if js.opts.clientTrace != nil { opts = append(opts, nats.ClientTrace{ - RequestSent: js.clientTrace.RequestSent, - ResponseReceived: js.clientTrace.ResponseReceived, + RequestSent: js.opts.clientTrace.RequestSent, + ResponseReceived: js.opts.clientTrace.ResponseReceived, }) } return js.conn.JetStream(opts...) @@ -928,7 +928,7 @@ func (kv *kvs) Put(ctx context.Context, key string, value []byte) (uint64, error var b strings.Builder if kv.useJSPfx { - b.WriteString(kv.js.apiPrefix) + b.WriteString(kv.js.opts.apiPrefix) } if kv.putPre != "" { b.WriteString(kv.putPre) @@ -978,7 +978,7 @@ func (kv *kvs) Update(ctx context.Context, key string, value []byte, revision ui var b strings.Builder if kv.useJSPfx { - b.WriteString(kv.js.apiPrefix) + b.WriteString(kv.js.opts.apiPrefix) } b.WriteString(kv.pre) b.WriteString(key) @@ -1001,7 +1001,7 @@ func (kv *kvs) Delete(ctx context.Context, key string, opts ...KVDeleteOpt) erro var b strings.Builder if kv.useJSPfx { - b.WriteString(kv.js.apiPrefix) + b.WriteString(kv.js.opts.apiPrefix) } if kv.putPre != "" { b.WriteString(kv.putPre) @@ -1416,7 +1416,7 @@ func mapStreamToKVS(js *jetStream, pushJS nats.JetStreamContext, stream Stream) pushJS: pushJS, stream: stream, // Determine if we need to use the JS prefix in front of Put and Delete operations - useJSPfx: js.apiPrefix != DefaultAPIPrefix, + useJSPfx: js.opts.apiPrefix != DefaultAPIPrefix, useDirect: info.Config.AllowDirect, } diff --git a/jetstream/message.go b/jetstream/message.go index 095f13968..217d2a483 100644 --- a/jetstream/message.go +++ b/jetstream/message.go @@ -342,7 +342,7 @@ func (m *jetStreamMsg) ackReply(ctx context.Context, ackType ackType, sync bool, if sync { var cancel context.CancelFunc - ctx, cancel = wrapContextWithoutDeadline(ctx) + ctx, cancel = m.js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } diff --git a/jetstream/ordered.go b/jetstream/ordered.go index 055b5ef46..c0083a4b0 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -28,7 +28,7 @@ import ( type ( orderedConsumer struct { - jetStream *jetStream + js *jetStream cfg *OrderedConsumerConfig stream string currentConsumer *pullConsumer @@ -543,7 +543,7 @@ func (c *orderedConsumer) reset() error { c.currentConsumer.Unlock() go func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - _ = c.jetStream.DeleteConsumer(ctx, c.stream, consName) + _ = c.js.DeleteConsumer(ctx, c.stream, consName) cancel() }() } @@ -568,7 +568,7 @@ func (c *orderedConsumer) reset() error { } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - cons, err = c.jetStream.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig) + cons, err = c.js.CreateOrUpdateConsumer(ctx, c.stream, *consumerConfig) if err != nil { return true, err } @@ -688,10 +688,10 @@ func (c *orderedConsumer) Info(ctx context.Context) (*ConsumerInfo, error) { if c.currentConsumer == nil { return nil, ErrOrderedConsumerNotCreated } - infoSubject := apiSubj(c.jetStream.apiPrefix, fmt.Sprintf(apiConsumerInfoT, c.stream, c.currentConsumer.name)) + infoSubject := fmt.Sprintf(apiConsumerInfoT, c.stream, c.currentConsumer.name) var resp consumerInfoResponse - if _, err := c.jetStream.apiRequestJSON(ctx, infoSubject, &resp); err != nil { + if _, err := c.js.apiRequestJSON(ctx, infoSubject, &resp); err != nil { return nil, err } if resp.Error != nil { diff --git a/jetstream/publish.go b/jetstream/publish.go index 70e219ac4..4c6af5942 100644 --- a/jetstream/publish.go +++ b/jetstream/publish.go @@ -154,7 +154,7 @@ func (js *jetStream) Publish(ctx context.Context, subj string, data []byte, opts // ack from server. It accepts subject name (which must be bound to a // stream) and nats.Message. func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...PublishOpt) (*PubAck, error) { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -290,7 +290,7 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut if err != nil { return nil, fmt.Errorf("nats: error creating async reply handler: %s", err) } - id = reply[js.replyPrefixLen:] + id = reply[js.opts.replyPrefixLen:] paf = &pubAckFuture{msg: m, jsClient: js.publisher, maxRetries: o.retryAttempts, retryWait: o.retryWait, reply: reply} numPending, maxPending := js.registerPAF(id, paf) @@ -305,7 +305,7 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut } else { // when retrying, get the ID from existing reply subject reply = paf.reply - id = reply[js.replyPrefixLen:] + id = reply[js.opts.replyPrefixLen:] } pubMsg := &nats.Msg{ @@ -337,7 +337,7 @@ func (js *jetStream) newAsyncReply() (string, error) { for i := 0; i < aReplyTokensize; i++ { b[i] = rdigits[int(b[i]%base)] } - js.publisher.replyPrefix = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize]) + js.publisher.replyPrefix = fmt.Sprintf("%s%s.", js.opts.replyPrefix, b[:aReplyTokensize]) sub, err := js.conn.Subscribe(fmt.Sprintf("%s*", js.publisher.replyPrefix), js.handleAsyncReply) if err != nil { js.publisher.Unlock() @@ -365,10 +365,10 @@ func (js *jetStream) newAsyncReply() (string, error) { // Handle an async reply from PublishAsync. func (js *jetStream) handleAsyncReply(m *nats.Msg) { - if len(m.Subject) <= js.replyPrefixLen { + if len(m.Subject) <= js.opts.replyPrefixLen { return } - id := m.Subject[js.replyPrefixLen:] + id := m.Subject[js.opts.replyPrefixLen:] js.publisher.Lock() diff --git a/jetstream/pull.go b/jetstream/pull.go index d0a7138ac..8ee17f2ea 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -81,12 +81,12 @@ type ( pullConsumer struct { sync.Mutex - jetStream *jetStream - stream string - durable bool - name string - info *ConsumerInfo - subs syncx.Map[string, *pullSubscription] + js *jetStream + stream string + durable bool + name string + info *ConsumerInfo + subs syncx.Map[string, *pullSubscription] } pullRequest struct { @@ -187,7 +187,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( } p.Lock() - subject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiRequestNextT, p.stream, p.name)) + subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name)) consumeID := nuid.Next() sub := &pullSubscription{ @@ -198,7 +198,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( fetchNext: make(chan *pullRequest, 1), consumeOpts: consumeOpts, } - sub.connStatusChanged = p.jetStream.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING) + sub.connStatusChanged = p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING) sub.hbMonitor = sub.scheduleHeartbeatCheck(consumeOpts.Heartbeat) @@ -245,7 +245,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( } return } - handler(p.jetStream.toJSMsg(msg)) + handler(p.js.toJSMsg(msg)) sub.Lock() sub.decrementPendingMsgs(msg) sub.incrementDeliveredMsgs() @@ -255,8 +255,8 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( sub.Stop() } } - inbox := p.jetStream.conn.NewInbox() - sub.subscription, err = p.jetStream.conn.Subscribe(inbox, internalHandler) + inbox := p.js.conn.NewInbox() + sub.subscription, err = p.js.conn.Subscribe(inbox, internalHandler) if err != nil { return nil, err } @@ -430,7 +430,7 @@ func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error } p.Lock() - subject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiRequestNextT, p.stream, p.name)) + subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name)) msgs := make(chan *nats.Msg, consumeOpts.MaxMessages) @@ -444,9 +444,9 @@ func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error fetchNext: make(chan *pullRequest, 1), consumeOpts: consumeOpts, } - sub.connStatusChanged = p.jetStream.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING) - inbox := p.jetStream.conn.NewInbox() - sub.subscription, err = p.jetStream.conn.ChanSubscribe(inbox, sub.msgs) + sub.connStatusChanged = p.js.conn.StatusChanged(nats.CONNECTED, nats.RECONNECTING) + inbox := p.js.conn.NewInbox() + sub.subscription, err = p.js.conn.ChanSubscribe(inbox, sub.msgs) if err != nil { p.Unlock() return nil, err @@ -547,7 +547,7 @@ func (s *pullSubscription) Next() (Msg, error) { } s.decrementPendingMsgs(msg) s.incrementDeliveredMsgs() - return s.consumer.jetStream.toJSMsg(msg), nil + return s.consumer.js.toJSMsg(msg), nil case err := <-s.errs: if errors.Is(err, ErrNoHeartbeat) { s.pending.msgCount = 0 @@ -755,7 +755,7 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { msgs: make(chan Msg, req.Batch), } msgs := make(chan *nats.Msg, 2*req.Batch) - subject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiRequestNextT, p.stream, p.name)) + subject := p.js.apiSubject(fmt.Sprintf(apiRequestNextT, p.stream, p.name)) sub := &pullSubscription{ consumer: p, @@ -763,9 +763,9 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { msgs: msgs, errs: make(chan error, 1), } - inbox := p.jetStream.conn.NewInbox() + inbox := p.js.conn.NewInbox() var err error - sub.subscription, err = p.jetStream.conn.ChanSubscribe(inbox, sub.msgs) + sub.subscription, err = p.js.conn.ChanSubscribe(inbox, sub.msgs) if err != nil { return nil, err } @@ -799,7 +799,7 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { res.Unlock() continue } - res.msgs <- p.jetStream.toJSMsg(msg) + res.msgs <- p.js.toJSMsg(msg) meta, err := msg.Metadata() if err != nil { res.err = fmt.Errorf("parsing message metadata: %s", err) @@ -938,7 +938,7 @@ func (s *pullSubscription) pull(req *pullRequest, subject string) error { } reply := s.subscription.Subject - if err := s.consumer.jetStream.conn.PublishRequest(subject, reply, reqJSON); err != nil { + if err := s.consumer.js.conn.PublishRequest(subject, reply, reqJSON); err != nil { return err } return nil diff --git a/jetstream/stream.go b/jetstream/stream.go index 1cd9975dc..fbfd3d2f1 100644 --- a/jetstream/stream.go +++ b/jetstream/stream.go @@ -119,9 +119,9 @@ type ( } stream struct { - name string - info *StreamInfo - jetStream *jetStream + name string + info *StreamInfo + js *jetStream } // StreamInfoOpt is a function setting options for [Stream.Info] @@ -241,7 +241,7 @@ type ( // possible). Consumer interface is returned, allowing to operate on a // consumer (e.g. fetch messages). func (s *stream) CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) { - return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionCreateOrUpdate) + return upsertConsumer(ctx, s.js, s.name, cfg, consumerActionCreateOrUpdate) } // CreateConsumer creates a consumer on a given stream with given @@ -251,14 +251,14 @@ func (s *stream) CreateOrUpdateConsumer(ctx context.Context, cfg ConsumerConfig) // existing consumer is returned. Consumer interface is returned, // allowing to operate on a consumer (e.g. fetch messages). func (s *stream) CreateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) { - return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionCreate) + return upsertConsumer(ctx, s.js, s.name, cfg, consumerActionCreate) } // UpdateConsumer updates an existing consumer. If consumer does not // exist, ErrConsumerDoesNotExist is returned. Consumer interface is // returned, allowing to operate on a consumer (e.g. fetch messages). func (s *stream) UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consumer, error) { - return upsertConsumer(ctx, s.jetStream, s.name, cfg, consumerActionUpdate) + return upsertConsumer(ctx, s.js, s.name, cfg, consumerActionUpdate) } // OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer @@ -267,7 +267,7 @@ func (s *stream) UpdateConsumer(ctx context.Context, cfg ConsumerConfig) (Consum // pull consumers and are resilient to deletes and restarts. func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) { oc := &orderedConsumer{ - jetStream: s.jetStream, + js: s.js, cfg: &cfg, stream: s.name, namePrefix: nuid.Next(), @@ -287,18 +287,18 @@ func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) // of messages. If consumer does not exist, ErrConsumerNotFound is // returned. func (s *stream) Consumer(ctx context.Context, name string) (Consumer, error) { - return getConsumer(ctx, s.jetStream, s.name, name) + return getConsumer(ctx, s.js, s.name, name) } // DeleteConsumer removes a consumer with given name from a stream. // If consumer does not exist, ErrConsumerNotFound is returned. func (s *stream) DeleteConsumer(ctx context.Context, name string) error { - return deleteConsumer(ctx, s.jetStream, s.name, name) + return deleteConsumer(ctx, s.js, s.name, name) } // Info returns StreamInfo from the server. func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, error) { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := s.js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -316,7 +316,7 @@ func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, var subjectMap map[string]uint64 var offset int - infoSubject := apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiStreamInfoT, s.name)) + infoSubject := fmt.Sprintf(apiStreamInfoT, s.name) var info *StreamInfo for { if infoReq != nil { @@ -332,7 +332,7 @@ func (s *stream) Info(ctx context.Context, opts ...StreamInfoOpt) (*StreamInfo, } } var resp streamInfoResponse - if _, err = s.jetStream.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil { + if _, err = s.js.apiRequestJSON(ctx, infoSubject, &resp, req); err != nil { return nil, err } if resp.Error != nil { @@ -375,7 +375,7 @@ func (s *stream) CachedInfo() *StreamInfo { // Purge removes messages from a stream. It is a destructive operation. // Use with caution. See StreamPurgeOpt for available options. func (s *stream) Purge(ctx context.Context, opts ...StreamPurgeOpt) error { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := s.js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -392,10 +392,10 @@ func (s *stream) Purge(ctx context.Context, opts ...StreamPurgeOpt) error { return err } - purgeSubject := apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiStreamPurgeT, s.name)) + purgeSubject := fmt.Sprintf(apiStreamPurgeT, s.name) var resp streamPurgeResponse - if _, err = s.jetStream.apiRequestJSON(ctx, purgeSubject, &resp, req); err != nil { + if _, err = s.js.apiRequestJSON(ctx, purgeSubject, &resp, req); err != nil { return err } if resp.Error != nil { @@ -423,7 +423,7 @@ func (s *stream) GetLastMsgForSubject(ctx context.Context, subject string) (*Raw } func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStreamMsg, error) { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := s.js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -436,24 +436,24 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream // handle direct gets if s.info.Config.AllowDirect { if mreq.LastFor != "" { - gmSubj = apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiDirectMsgGetLastBySubjectT, s.name, mreq.LastFor)) - r, err := s.jetStream.apiRequest(ctx, gmSubj, nil) + gmSubj = fmt.Sprintf(apiDirectMsgGetLastBySubjectT, s.name, mreq.LastFor) + r, err := s.js.apiRequest(ctx, gmSubj, nil) if err != nil { return nil, err } - return convertDirectGetMsgResponseToMsg(s.name, r.msg) + return convertDirectGetMsgResponseToMsg(r.msg) } - gmSubj = apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiDirectMsgGetT, s.name)) - r, err := s.jetStream.apiRequest(ctx, gmSubj, req) + gmSubj = fmt.Sprintf(apiDirectMsgGetT, s.name) + r, err := s.js.apiRequest(ctx, gmSubj, req) if err != nil { return nil, err } - return convertDirectGetMsgResponseToMsg(s.name, r.msg) + return convertDirectGetMsgResponseToMsg(r.msg) } var resp apiMsgGetResponse - dsSubj := apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiMsgGetT, s.name)) - _, err = s.jetStream.apiRequestJSON(ctx, dsSubj, &resp, req) + dsSubj := fmt.Sprintf(apiMsgGetT, s.name) + _, err = s.js.apiRequestJSON(ctx, dsSubj, &resp, req) if err != nil { return nil, err } @@ -484,7 +484,7 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream }, nil } -func convertDirectGetMsgResponseToMsg(name string, r *nats.Msg) (*RawStreamMsg, error) { +func convertDirectGetMsgResponseToMsg(r *nats.Msg) (*RawStreamMsg, error) { // Check for 404/408. We would get a no-payload message and a "Status" header if len(r.Data) == 0 { val := r.Header.Get(statusHdr) @@ -555,7 +555,7 @@ func (s *stream) SecureDeleteMsg(ctx context.Context, seq uint64) error { } func (s *stream) deleteMsg(ctx context.Context, req *msgDeleteRequest) error { - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := s.js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -563,9 +563,9 @@ func (s *stream) deleteMsg(ctx context.Context, req *msgDeleteRequest) error { if err != nil { return err } - subj := apiSubj(s.jetStream.apiPrefix, fmt.Sprintf(apiMsgDeleteT, s.name)) + subj := fmt.Sprintf(apiMsgDeleteT, s.name) var resp msgDeleteResponse - if _, err = s.jetStream.apiRequestJSON(ctx, subj, &resp, r); err != nil { + if _, err = s.js.apiRequestJSON(ctx, subj, &resp, r); err != nil { return err } if !resp.Success { @@ -578,12 +578,12 @@ func (s *stream) deleteMsg(ctx context.Context, req *msgDeleteRequest) error { // channel of consumer infos. func (s *stream) ListConsumers(ctx context.Context) ConsumerInfoLister { l := &consumerLister{ - js: s.jetStream, + js: s.js, consumers: make(chan *ConsumerInfo), } go func() { defer close(l.consumers) - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := s.js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -625,12 +625,12 @@ func (s *consumerLister) Err() error { // channel of consumer names. func (s *stream) ConsumerNames(ctx context.Context) ConsumerNameLister { l := &consumerLister{ - js: s.jetStream, + js: s.js, names: make(chan string), } go func() { defer close(l.names) - ctx, cancel := wrapContextWithoutDeadline(ctx) + ctx, cancel := s.js.wrapContextWithoutDeadline(ctx) if cancel != nil { defer cancel() } @@ -674,7 +674,7 @@ func (s *consumerLister) consumerInfos(ctx context.Context, stream string) ([]*C return nil, err } - slSubj := apiSubj(s.js.apiPrefix, fmt.Sprintf(apiConsumerListT, stream)) + slSubj := fmt.Sprintf(apiConsumerListT, stream) var resp consumerListResponse _, err = s.js.apiRequestJSON(ctx, slSubj, &resp, req) if err != nil { @@ -702,7 +702,7 @@ func (s *consumerLister) consumerNames(ctx context.Context, stream string) ([]st return nil, err } - slSubj := apiSubj(s.js.apiPrefix, fmt.Sprintf(apiConsumerNamesT, stream)) + slSubj := fmt.Sprintf(apiConsumerNamesT, stream) var resp consumerNamesResponse _, err = s.js.apiRequestJSON(ctx, slSubj, &resp, req) if err != nil { diff --git a/jetstream/test/jetstream_test.go b/jetstream/test/jetstream_test.go index 5278863e2..86addad6f 100644 --- a/jetstream/test/jetstream_test.go +++ b/jetstream/test/jetstream_test.go @@ -81,6 +81,13 @@ func TestNewWithAPIPrefix(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + opts := jsTest.Options() + if opts.APIPrefix != "main" { + t.Fatalf("Invalid API prefix; want: %v, got: %v", "main", opts.APIPrefix) + } + if opts.Domain != "" { + t.Fatalf("Invalid domain; want: %v, got: %v", "", opts.Domain) + } _, err = jsTest.Publish(ctx, "foo", []byte("msg")) if err != nil { @@ -134,6 +141,14 @@ func TestNewWithDomain(t *testing.T) { t.Errorf("Invalid domain; want %v, got: %v", "ABC", accInfo.Domain) } + opts := js.Options() + if opts.APIPrefix != "" { + t.Fatalf("Invalid API prefix; want: %v, got: %v", "main", opts.APIPrefix) + } + if opts.Domain != "ABC" { + t.Fatalf("Invalid domain; want: %v, got: %v", "", opts.Domain) + } + _, err = js.CreateStream(ctx, jetstream.StreamConfig{ Name: "TEST", Subjects: []string{"foo"}, @@ -164,6 +179,32 @@ func TestNewWithDomain(t *testing.T) { }) } +func TestJetStreamOptionsReadOnly(t *testing.T) { + srv := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, srv) + nc, err := nats.Connect(srv.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := jetstream.New(nc) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + opts := js.Options() + opts.APIPrefix = "foo" + opts.Domain = "bar" + + opts = js.Options() + if opts.APIPrefix != "" { + t.Fatalf("Invalid API prefix; want: %v, got: %v", "", opts.APIPrefix) + } + if opts.Domain != "" { + t.Fatalf("Invalid domain; want: %v, got: %v", "", opts.Domain) + } +} + func TestWithClientTrace(t *testing.T) { srv := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, srv)