Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Support for disabling queue groups at service, group, and endpoint levels #1797

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions micro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ _ = numbersGroup.AddEndpoint("multiply", micro.HandlerFunc(multiplyHandler))
## Customizing queue groups

For each service, group and endpoint the queue group used to gather responses
can be customized. If not provided a default queue group will be used (`q`).
Customizing queue groups can be useful to e.g. implement fanout request pattern
or hedged request pattern (to reduce tail latencies by only waiting for the
first response for multiple service instances).
can be customized or disabled. If not provided a default queue group will be
used (`q`). Customizing queue groups can be useful to e.g. implement fanout
request pattern or hedged request pattern (to reduce tail latencies by only
waiting for the first response for multiple service instances).

Let's say we have multiple services listening on the same subject, but with
different queue groups:
Expand Down Expand Up @@ -154,6 +154,27 @@ Queue groups can be overwritten by setting them on groups and endpoints as well:
g.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q3"))
```

Similarly, queue groups can be disabled on service config, group and endpoint levels. If disabled,
a standard NATS subscription will be created for the endpoint.

```go
// disable queue group for the service
srv, _ := micro.AddService(nc, micro.Config{
Name: "EchoService",
Version: "1.0.0",
QueueGroupDisabled: true,
})

// create a group with queue group disabled
srv.AddGroup("g", micro.WithEndpointQueueGroupDisabled())

// create an endpoint with queue group disabled
srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroupDisabled())
```

When disabling queue groups, same inheritance rules apply as for customizing
queue groups. (service config -> group -> endpoint)

## Discovery and Monitoring

Each service is assigned a unique ID on creation. A service instance is
Expand Down
113 changes: 77 additions & 36 deletions micro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ type (
subject string
metadata map[string]string
queueGroup string
qgDisabled bool
}

groupOpts struct {
queueGroup string
qgDisabled bool
}

// ErrHandler is a function used to configure a custom error handler for a service,
Expand Down Expand Up @@ -156,9 +158,10 @@ type (
}

group struct {
service *service
prefix string
queueGroup string
service *service
prefix string
queueGroup string
queueGroupDisabled bool
}

// Verb represents a name of the monitoring service.
Expand Down Expand Up @@ -186,6 +189,9 @@ type (
// QueueGroup can be used to override the default queue group name.
QueueGroup string `json:"queue_group"`

// QueueGroupDisabled disables the queue group for the service.
QueueGroupDisabled bool `json:"queue_group_disabled"`

// StatsHandler is a user-defined custom function.
// used to calculate additional service stats.
StatsHandler StatsHandler
Expand All @@ -209,6 +215,9 @@ type (

// QueueGroup can be used to override the default queue group name.
QueueGroup string `json:"queue_group"`

// QueueGroupDisabled disables the queue group for the endpoint.
QueueGroupDisabled bool `json:"queue_group_disabled"`
}

// NATSError represents an error returned by a NATS Subscription.
Expand Down Expand Up @@ -401,11 +410,11 @@ func (s *service) AddEndpoint(name string, handler Handler, opts ...EndpointOpt)
if options.subject != "" {
subject = options.subject
}
queueGroup := queueGroupName(options.queueGroup, s.Config.QueueGroup)
return addEndpoint(s, name, subject, handler, options.metadata, queueGroup)
queueGroup, noQueue := resolveQueueGroup(options.queueGroup, s.Config.QueueGroup, options.qgDisabled, s.Config.QueueGroupDisabled)
return addEndpoint(s, name, subject, handler, options.metadata, queueGroup, noQueue)
}

func addEndpoint(s *service, name, subject string, handler Handler, metadata map[string]string, queueGroup string) error {
func addEndpoint(s *service, name, subject string, handler Handler, metadata map[string]string, queueGroup string, noQueue bool) error {
if !nameRegexp.MatchString(name) {
return fmt.Errorf("%w: invalid endpoint name", ErrConfigValidation)
}
Expand All @@ -418,21 +427,34 @@ func addEndpoint(s *service, name, subject string, handler Handler, metadata map
endpoint := &Endpoint{
service: s,
EndpointConfig: EndpointConfig{
Subject: subject,
Handler: handler,
Metadata: metadata,
QueueGroup: queueGroup,
Subject: subject,
Handler: handler,
Metadata: metadata,
QueueGroup: queueGroup,
QueueGroupDisabled: noQueue,
},
Name: name,
}

sub, err := s.nc.QueueSubscribe(
subject,
queueGroup,
func(m *nats.Msg) {
s.reqHandler(endpoint, &request{msg: m})
},
)
var sub *nats.Subscription
var err error

if !noQueue {
sub, err = s.nc.QueueSubscribe(
subject,
queueGroup,
func(m *nats.Msg) {
s.reqHandler(endpoint, &request{msg: m})
},
)
} else {
sub, err = s.nc.Subscribe(
subject,
func(m *nats.Msg) {
s.reqHandler(endpoint, &request{msg: m})
},
)
}
if err != nil {
return err
}
Expand All @@ -453,11 +475,12 @@ func (s *service) AddGroup(name string, opts ...GroupOpt) Group {
for _, opt := range opts {
opt(&o)
}
queueGroup := queueGroupName(o.queueGroup, s.Config.QueueGroup)
queueGroup, noQueue := resolveQueueGroup(o.queueGroup, s.Config.QueueGroup, o.qgDisabled, s.Config.QueueGroupDisabled)
return &group{
service: s,
prefix: name,
queueGroup: queueGroup,
service: s,
prefix: name,
queueGroup: queueGroup,
queueGroupDisabled: noQueue,
}
}

Expand Down Expand Up @@ -801,29 +824,33 @@ func (g *group) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) e
if g.prefix == "" {
endpointSubject = subject
}
queueGroup := queueGroupName(options.queueGroup, g.queueGroup)
queueGroup, noQueue := resolveQueueGroup(options.queueGroup, g.queueGroup, options.qgDisabled, g.queueGroupDisabled)

return addEndpoint(g.service, name, endpointSubject, handler, options.metadata, queueGroup)
return addEndpoint(g.service, name, endpointSubject, handler, options.metadata, queueGroup, noQueue)
}

func queueGroupName(customQG, parentQG string) string {
queueGroup := customQG
if queueGroup == "" {
if parentQG != "" {
queueGroup = parentQG
} else {
queueGroup = DefaultQueueGroup
}
func resolveQueueGroup(customQG, parentQG string, disabled, parentDisabled bool) (string, bool) {
if disabled {
return "", true
}
if customQG != "" {
return customQG, false
}
if parentDisabled {
return "", true
}
return queueGroup
if parentQG != "" {
return parentQG, false
}
return DefaultQueueGroup, false
}

func (g *group) AddGroup(name string, opts ...GroupOpt) Group {
var o groupOpts
for _, opt := range opts {
opt(&o)
}
queueGroup := queueGroupName(o.queueGroup, g.queueGroup)
queueGroup, noQueue := resolveQueueGroup(o.queueGroup, g.queueGroup, o.qgDisabled, g.queueGroupDisabled)

parts := make([]string, 0, 2)
if g.prefix != "" {
Expand All @@ -835,9 +862,10 @@ func (g *group) AddGroup(name string, opts ...GroupOpt) Group {
prefix := strings.Join(parts, ".")

return &group{
service: g.service,
prefix: prefix,
queueGroup: queueGroup,
service: g.service,
prefix: prefix,
queueGroup: queueGroup,
queueGroupDisabled: noQueue,
}
}

Expand Down Expand Up @@ -911,8 +939,21 @@ func WithEndpointQueueGroup(queueGroup string) EndpointOpt {
}
}

func WithEndpointQueueGroupDisabled() EndpointOpt {
return func(e *endpointOpts) error {
e.qgDisabled = true
return nil
}
}

func WithGroupQueueGroup(queueGroup string) GroupOpt {
return func(g *groupOpts) {
g.queueGroup = queueGroup
}
}

func WithGroupQueueGroupDisabled() GroupOpt {
return func(g *groupOpts) {
g.qgDisabled = true
}
}
115 changes: 110 additions & 5 deletions micro/test/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,59 @@ func TestCustomQueueGroup(t *testing.T) {
"baz": "custom",
},
},
{
name: "disable queue group on service config",
endpointInit: func(t *testing.T, nc *nats.Conn) micro.Service {
srv, err := micro.AddService(nc, micro.Config{
Name: "test_service",
Version: "0.0.1",
QueueGroupDisabled: true,
Endpoint: &micro.EndpointConfig{
Subject: "foo",
Handler: micro.HandlerFunc(func(r micro.Request) {}),
},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// add endpoint on service directly, should have inherited disabled queue group
err = srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// add group with queue group from service config
g1 := srv.AddGroup("g1")

// add endpoint on group, should have queue group disabled
err = g1.AddEndpoint("baz", micro.HandlerFunc(func(r micro.Request) {}))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// add endpoint on a service with queue group enabled
err = srv.AddEndpoint("qux", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-qux"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

// add endpoint on group and set custom queue group
err = g1.AddEndpoint("quux", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-quux"))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

return srv
},
expectedQueueGroups: map[string]string{
"default": "",
"bar": "",
"baz": "",
"qux": "q-qux",
"quux": "q-quux",
},
},
{
name: "overwriting queue groups",
endpointInit: func(t *testing.T, nc *nats.Conn) micro.Service {
Expand All @@ -1598,6 +1651,9 @@ func TestCustomQueueGroup(t *testing.T) {
// overwrite parent group queue group
g3 := g2.AddGroup("g3", micro.WithGroupQueueGroup("q-g3"))

// disable queue group on group
g4 := g2.AddGroup("g4", micro.WithGroupQueueGroupDisabled())

// add endpoint on service directly, overwriting the queue group
err = srv.AddEndpoint("bar", micro.HandlerFunc(func(r micro.Request) {}), micro.WithEndpointQueueGroup("q-bar"))
if err != nil {
Expand All @@ -1621,14 +1677,20 @@ func TestCustomQueueGroup(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

err = g4.AddEndpoint("foo-disabled", micro.HandlerFunc(func(r micro.Request) {}))
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return srv
},
expectedQueueGroups: map[string]string{
"default": "q-default",
"bar": "q-bar",
"baz": "q-g1",
"qux": "q-qux",
"quux": "q-g3",
"default": "q-default",
"bar": "q-bar",
"baz": "q-g1",
"qux": "q-qux",
"quux": "q-g3",
"foo-disabled": "",
},
},
{
Expand Down Expand Up @@ -1805,3 +1867,46 @@ func TestCustomQueueGroupMultipleResponses(t *testing.T) {
}
}
}

func TestDisableQueueGroup(t *testing.T) {
s := RunServerOnPort(-1)
defer s.Shutdown()

nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Expected to connect to server, got %v", err)
}
defer nc.Close()
wg := sync.WaitGroup{}

// Create 5 service responders.
config := micro.Config{
Name: "CoolAddService",
Version: "0.1.0",
Description: "Add things together",
Metadata: map[string]string{"basic": "metadata"},
Endpoint: &micro.EndpointConfig{
Subject: "svc.add",
Handler: micro.HandlerFunc(func(r micro.Request) {
r.Respond(nil)
wg.Done()
}),
},
QueueGroupDisabled: true,
}

for range 10 {
srv, err := micro.AddService(nc, config)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer srv.Stop()
}
wg.Add(10)
// Send a request to the service.
if err = nc.PublishRequest("svc.add", "rply", []byte("req")); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
wg.Wait()

}