diff --git a/micro/README.md b/micro/README.md index 3ca09a562..a516dbf91 100644 --- a/micro/README.md +++ b/micro/README.md @@ -98,10 +98,10 @@ _ = numbersGroup.AddEndpoint("multiply", micro.HandlerFunc(multiplyHandler)) ## Customizing queue groups For each service, group and endpoint the queue group used to gather responses -can be customized. If not provided a default queue group will be used (`q`). -Customizing queue groups can be useful to e.g. implement fanout request pattern -or hedged request pattern (to reduce tail latencies by only waiting for the -first response for multiple service instances). +can be customized or disabled. If not provided a default queue group will be +used (`q`). Customizing queue groups can be useful to e.g. implement fanout +request pattern or hedged request pattern (to reduce tail latencies by only +waiting for the first response for multiple service instances). Let's say we have multiple services listening on the same subject, but with different queue groups: @@ -154,6 +154,27 @@ Queue groups can be overwritten by setting them on groups and endpoints as well: g.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q3")) ``` +Similarly, queue groups can be disabled on service config, group and endpoint levels. If disabled, +a standard NATS subscription will be created for the endpoint. + +```go + // disable queue group for the service + srv, _ := micro.AddService(nc, micro.Config{ + Name: "EchoService", + Version: "1.0.0", + QueueGroupDisabled: true, + }) + + // create a group with queue group disabled + srv.AddGroup("g", micro.WithEndpointQueueGroupDisabled()) + + // create an endpoint with queue group disabled + srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroupDisabled()) +``` + +When disabling queue groups, same inheritance rules apply as for customizing +queue groups. (service config -> group -> endpoint) + ## Discovery and Monitoring Each service is assigned a unique ID on creation. A service instance is diff --git a/micro/service.go b/micro/service.go index f75825dd3..045729c6b 100644 --- a/micro/service.go +++ b/micro/service.go @@ -77,10 +77,12 @@ type ( subject string metadata map[string]string queueGroup string + qgDisabled bool } groupOpts struct { queueGroup string + qgDisabled bool } // ErrHandler is a function used to configure a custom error handler for a service, @@ -156,9 +158,10 @@ type ( } group struct { - service *service - prefix string - queueGroup string + service *service + prefix string + queueGroup string + queueGroupDisabled bool } // Verb represents a name of the monitoring service. @@ -186,6 +189,9 @@ type ( // QueueGroup can be used to override the default queue group name. QueueGroup string `json:"queue_group"` + // QueueGroupDisabled disables the queue group for the service. + QueueGroupDisabled bool `json:"queue_group_disabled"` + // StatsHandler is a user-defined custom function. // used to calculate additional service stats. StatsHandler StatsHandler @@ -209,6 +215,9 @@ type ( // QueueGroup can be used to override the default queue group name. QueueGroup string `json:"queue_group"` + + // QueueGroupDisabled disables the queue group for the endpoint. + QueueGroupDisabled bool `json:"queue_group_disabled"` } // NATSError represents an error returned by a NATS Subscription. @@ -401,11 +410,11 @@ func (s *service) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) if options.subject != "" { subject = options.subject } - queueGroup := queueGroupName(options.queueGroup, s.Config.QueueGroup) - return addEndpoint(s, name, subject, handler, options.metadata, queueGroup) + queueGroup, noQueue := resolveQueueGroup(options.queueGroup, s.Config.QueueGroup, options.qgDisabled, s.Config.QueueGroupDisabled) + return addEndpoint(s, name, subject, handler, options.metadata, queueGroup, noQueue) } -func addEndpoint(s *service, name, subject string, handler Handler, metadata map[string]string, queueGroup string) error { +func addEndpoint(s *service, name, subject string, handler Handler, metadata map[string]string, queueGroup string, noQueue bool) error { if !nameRegexp.MatchString(name) { return fmt.Errorf("%w: invalid endpoint name", ErrConfigValidation) } @@ -418,21 +427,34 @@ func addEndpoint(s *service, name, subject string, handler Handler, metadata map endpoint := &Endpoint{ service: s, EndpointConfig: EndpointConfig{ - Subject: subject, - Handler: handler, - Metadata: metadata, - QueueGroup: queueGroup, + Subject: subject, + Handler: handler, + Metadata: metadata, + QueueGroup: queueGroup, + QueueGroupDisabled: noQueue, }, Name: name, } - sub, err := s.nc.QueueSubscribe( - subject, - queueGroup, - func(m *nats.Msg) { - s.reqHandler(endpoint, &request{msg: m}) - }, - ) + var sub *nats.Subscription + var err error + + if !noQueue { + sub, err = s.nc.QueueSubscribe( + subject, + queueGroup, + func(m *nats.Msg) { + s.reqHandler(endpoint, &request{msg: m}) + }, + ) + } else { + sub, err = s.nc.Subscribe( + subject, + func(m *nats.Msg) { + s.reqHandler(endpoint, &request{msg: m}) + }, + ) + } if err != nil { return err } @@ -453,11 +475,12 @@ func (s *service) AddGroup(name string, opts ...GroupOpt) Group { for _, opt := range opts { opt(&o) } - queueGroup := queueGroupName(o.queueGroup, s.Config.QueueGroup) + queueGroup, noQueue := resolveQueueGroup(o.queueGroup, s.Config.QueueGroup, o.qgDisabled, s.Config.QueueGroupDisabled) return &group{ - service: s, - prefix: name, - queueGroup: queueGroup, + service: s, + prefix: name, + queueGroup: queueGroup, + queueGroupDisabled: noQueue, } } @@ -801,21 +824,25 @@ func (g *group) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) e if g.prefix == "" { endpointSubject = subject } - queueGroup := queueGroupName(options.queueGroup, g.queueGroup) + queueGroup, noQueue := resolveQueueGroup(options.queueGroup, g.queueGroup, options.qgDisabled, g.queueGroupDisabled) - return addEndpoint(g.service, name, endpointSubject, handler, options.metadata, queueGroup) + return addEndpoint(g.service, name, endpointSubject, handler, options.metadata, queueGroup, noQueue) } -func queueGroupName(customQG, parentQG string) string { - queueGroup := customQG - if queueGroup == "" { - if parentQG != "" { - queueGroup = parentQG - } else { - queueGroup = DefaultQueueGroup - } +func resolveQueueGroup(customQG, parentQG string, disabled, parentDisabled bool) (string, bool) { + if disabled { + return "", true + } + if customQG != "" { + return customQG, false + } + if parentDisabled { + return "", true } - return queueGroup + if parentQG != "" { + return parentQG, false + } + return DefaultQueueGroup, false } func (g *group) AddGroup(name string, opts ...GroupOpt) Group { @@ -823,7 +850,7 @@ func (g *group) AddGroup(name string, opts ...GroupOpt) Group { for _, opt := range opts { opt(&o) } - queueGroup := queueGroupName(o.queueGroup, g.queueGroup) + queueGroup, noQueue := resolveQueueGroup(o.queueGroup, g.queueGroup, o.qgDisabled, g.queueGroupDisabled) parts := make([]string, 0, 2) if g.prefix != "" { @@ -835,9 +862,10 @@ func (g *group) AddGroup(name string, opts ...GroupOpt) Group { prefix := strings.Join(parts, ".") return &group{ - service: g.service, - prefix: prefix, - queueGroup: queueGroup, + service: g.service, + prefix: prefix, + queueGroup: queueGroup, + queueGroupDisabled: noQueue, } } @@ -911,8 +939,21 @@ func WithEndpointQueueGroup(queueGroup string) EndpointOpt { } } +func WithEndpointQueueGroupDisabled() EndpointOpt { + return func(e *endpointOpts) error { + e.qgDisabled = true + return nil + } +} + func WithGroupQueueGroup(queueGroup string) GroupOpt { return func(g *groupOpts) { g.queueGroup = queueGroup } } + +func WithGroupQueueGroupDisabled() GroupOpt { + return func(g *groupOpts) { + g.qgDisabled = true + } +} diff --git a/micro/test/service_test.go b/micro/test/service_test.go index 5c8c22c9c..ac3c63877 100644 --- a/micro/test/service_test.go +++ b/micro/test/service_test.go @@ -1574,6 +1574,59 @@ func TestCustomQueueGroup(t *testing.T) { "baz": "custom", }, }, + { + name: "disable queue group on service config", + endpointInit: func(t *testing.T, nc *nats.Conn) micro.Service { + srv, err := micro.AddService(nc, micro.Config{ + Name: "test_service", + Version: "0.0.1", + QueueGroupDisabled: true, + Endpoint: µ.EndpointConfig{ + Subject: "foo", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // add endpoint on service directly, should have inherited disabled queue group + err = srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // add group with queue group from service config + g1 := srv.AddGroup("g1") + + // add endpoint on group, should have queue group disabled + err = g1.AddEndpoint("baz", micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // add endpoint on a service with queue group enabled + err = srv.AddEndpoint("qux", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-qux")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // add endpoint on group and set custom queue group + err = g1.AddEndpoint("quux", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-quux")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + return srv + }, + expectedQueueGroups: map[string]string{ + "default": "", + "bar": "", + "baz": "", + "qux": "q-qux", + "quux": "q-quux", + }, + }, { name: "overwriting queue groups", endpointInit: func(t *testing.T, nc *nats.Conn) micro.Service { @@ -1598,6 +1651,9 @@ func TestCustomQueueGroup(t *testing.T) { // overwrite parent group queue group g3 := g2.AddGroup("g3", micro.WithGroupQueueGroup("q-g3")) + // disable queue group on group + g4 := g2.AddGroup("g4", micro.WithGroupQueueGroupDisabled()) + // add endpoint on service directly, overwriting the queue group err = srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-bar")) if err != nil { @@ -1621,14 +1677,20 @@ func TestCustomQueueGroup(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + + err = g4.AddEndpoint("foo-disabled", micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } return srv }, expectedQueueGroups: map[string]string{ - "default": "q-default", - "bar": "q-bar", - "baz": "q-g1", - "qux": "q-qux", - "quux": "q-g3", + "default": "q-default", + "bar": "q-bar", + "baz": "q-g1", + "qux": "q-qux", + "quux": "q-g3", + "foo-disabled": "", }, }, { @@ -1805,3 +1867,46 @@ func TestCustomQueueGroupMultipleResponses(t *testing.T) { } } } + +func TestDisableQueueGroup(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + wg := sync.WaitGroup{} + + // Create 5 service responders. + config := micro.Config{ + Name: "CoolAddService", + Version: "0.1.0", + Description: "Add things together", + Metadata: map[string]string{"basic": "metadata"}, + Endpoint: µ.EndpointConfig{ + Subject: "svc.add", + Handler: micro.HandlerFunc(func(r micro.Request) { + r.Respond(nil) + wg.Done() + }), + }, + QueueGroupDisabled: true, + } + + for range 10 { + srv, err := micro.AddService(nc, config) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer srv.Stop() + } + wg.Add(10) + // Send a request to the service. + if err = nc.PublishRequest("svc.add", "rply", []byte("req")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + wg.Wait() + +}