Skip to content

Commit

Permalink
Fix leaking goroutines in Service API
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Feb 4, 2025
1 parent dee04fa commit 036dcbb
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
22 changes: 19 additions & 3 deletions micro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type (

asyncCallbacksHandler struct {
cbQueue chan func()
closed bool
}
)

Expand Down Expand Up @@ -351,6 +352,7 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
opts = append(opts, WithEndpointQueueGroup(config.QueueGroup))
}
if err := svc.AddEndpoint("default", config.Endpoint.Handler, opts...); err != nil {
svc.asyncDispatcher.close()
return nil, err
}
}
Expand Down Expand Up @@ -462,8 +464,8 @@ func (s *service) AddGroup(name string, opts ...GroupOpt) Group {
// dispatch is responsible for calling any async callbacks
func (ac *asyncCallbacksHandler) run() {
for {
f := <-ac.cbQueue
if f == nil {
f, ok := <-ac.cbQueue
if !ok || f == nil {
return
}
f()
Expand All @@ -476,7 +478,11 @@ func (ac *asyncCallbacksHandler) push(f func()) {
}

func (ac *asyncCallbacksHandler) close() {
if ac.closed {
return
}
close(ac.cbQueue)
ac.closed = true
}

func (c *Config) valid() error {
Expand Down Expand Up @@ -565,6 +571,9 @@ func (s *service) wrapConnectionEventCallbacks() {
}

func unwrapConnectionEventCallbacks(nc *nats.Conn, handlers handlers) {
if nc.IsClosed() {
return
}
nc.SetClosedHandler(handlers.closed)
nc.SetErrorHandler(handlers.asyncErr)
}
Expand Down Expand Up @@ -666,13 +675,18 @@ func (s *service) Stop() error {
}
for _, e := range s.endpoints {
if err := e.stop(); err != nil {
fmt.Println("Error stopping endpoint: ", err)
return err
}
}
var keys []string
for key, sub := range s.verbSubs {
keys = append(keys, key)
if err := sub.Drain(); err != nil {
// connection is closed so draining is not possible
if errors.Is(err, nats.ErrConnectionClosed) {
break
}
return fmt.Errorf("draining subscription for subject %q: %w", sub.Subject, err)
}
}
Expand Down Expand Up @@ -828,7 +842,9 @@ func (g *group) AddGroup(name string, opts ...GroupOpt) Group {
}

func (e *Endpoint) stop() error {
if err := e.subscription.Drain(); err != nil {
// Drain the subscription. If the connection is closed, draining is not possible
// but we should still remove the endpoint from the service.
if err := e.subscription.Drain(); err != nil && !errors.Is(err, nats.ErrConnectionClosed) {
return fmt.Errorf("draining subscription for request handler: %w", err)
}
for i := 0; i < len(e.service.endpoints); i++ {
Expand Down
6 changes: 6 additions & 0 deletions micro/test/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ func TestAddService(t *testing.T) {
t.Fatalf("Expected to connect to server, got %v", err)
}
defer nc.Close()
// cleanup handlers since we invoke them manually
defer nc.SetClosedHandler(nil)
defer nc.SetErrorHandler(nil)

srv, err := micro.AddService(nc, test.givenConfig)
if test.withError != nil {
Expand Down Expand Up @@ -1389,6 +1392,9 @@ func TestRequestRespond(t *testing.T) {
Header: nats.Header{"key": []string{"value"}},
}, 50*time.Millisecond)
if test.withRespondError != nil {
if err == nil {
t.Fatalf("Expected error when receiving response")
}
return
}
if err != nil {
Expand Down

0 comments on commit 036dcbb

Please sign in to comment.