Skip to content

Commit

Permalink
Merge pull request etcd-io#19166 from joshuazh-x/fix-embed-close-dead…
Browse files Browse the repository at this point in the history
…lock-3.4

[3.4] Avoid deadlock in etcd.Close when stopping during bootstrapping
  • Loading branch information
serathius authored Jan 13, 2025
2 parents e34100c + a3bf49b commit ab34cdb
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 4 deletions.
2 changes: 1 addition & 1 deletion embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
if !serving {
// errored before starting gRPC server for serveCtx.serversC
for _, sctx := range e.sctxs {
close(sctx.serversC)
sctx.close()
}
}
e.Close()
Expand Down
21 changes: 18 additions & 3 deletions embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package embed

import (
"context"
"errors"
"fmt"
"io/ioutil"
defaultLog "log"
"net"
"net/http"
"strings"
"sync"

"go.etcd.io/etcd/etcdserver"
"go.etcd.io/etcd/etcdserver/api/v3client"
Expand Down Expand Up @@ -63,6 +65,7 @@ type serveCtx struct {
userHandlers map[string]http.Handler
serviceRegister func(*grpc.Server)
serversC chan *servers
closeOnce sync.Once
}

type servers struct {
Expand Down Expand Up @@ -94,7 +97,15 @@ func (sctx *serveCtx) serve(
splitHttp bool,
gopts ...grpc.ServerOption) (err error) {
logger := defaultLog.New(ioutil.Discard, "etcdhttp", 0)
<-s.ReadyNotify()

// Make sure serversC is closed even if we prematurely exit the function.
defer sctx.close()

select {
case <-s.StoppingNotify():
return errors.New("server is stopping")
case <-s.ReadyNotify():
}

if sctx.lg != nil {
sctx.lg.Info("ready to serve client requests")
Expand All @@ -113,8 +124,6 @@ func (sctx *serveCtx) serve(
servElection := v3election.NewElectionServer(v3c)
servLock := v3lock.NewLockServer(v3c)

// Make sure serversC is closed even if we prematurely exit the function.
defer close(sctx.serversC)
var gwmux *gw.ServeMux
if s.Cfg.EnableGRPCGateway {
// GRPC gateway connects to grpc server via connection provided by grpc dial.
Expand Down Expand Up @@ -549,3 +558,9 @@ func (sctx *serveCtx) registerTrace() {
evf := func(w http.ResponseWriter, r *http.Request) { trace.RenderEvents(w, r, true) }
sctx.registerUserHandler("/debug/events", http.HandlerFunc(evf))
}

func (sctx *serveCtx) close() {
sctx.closeOnce.Do(func() {
close(sctx.serversC)
})
}
5 changes: 5 additions & 0 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,10 @@ func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
// when the server is stopped.
func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }

// StoppingNotify returns a channel that receives an empty struct
// when the server is being stopped.
func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping }

func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }

func (s *EtcdServer) LeaderStats() []byte {
Expand Down Expand Up @@ -2163,6 +2167,7 @@ func (s *EtcdServer) publish(timeout time.Duration) {
Val: string(b),
}

// gofail: var beforePublishing struct{}
for {
ctx, cancel := context.WithTimeout(s.ctx, timeout)
_, err := s.Do(ctx, req)
Expand Down
56 changes: 56 additions & 0 deletions integration/embed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
gofail "go.etcd.io/gofail/runtime"
)

func TestEmbedEtcd(t *testing.T) {
Expand Down Expand Up @@ -196,3 +199,56 @@ func setupEmbedCfg(cfg *embed.Config, curls []url.URL, purls []url.URL) {
}
cfg.InitialCluster = cfg.InitialCluster[1:]
}

func TestEmbedEtcdStopDuringBootstrapping(t *testing.T) {
if len(gofail.List()) == 0 {
t.Skip("please run 'make gofail-enable' before running the test")
}

fpName := "beforePublishing"
require.NoError(t, gofail.Enable(fpName, `sleep("2s")`))
t.Cleanup(func() {
terr := gofail.Disable(fpName)
if terr != nil && terr != gofail.ErrDisabled {
t.Fatalf("failed to disable %s: %v", fpName, terr)
}
})

done := make(chan struct{})
go func() {
defer close(done)

cfg := embed.NewConfig()
urls := newEmbedURLs(false, 2)
setupEmbedCfg(cfg, []url.URL{urls[0]}, []url.URL{urls[1]})
cfg.Dir = filepath.Join(t.TempDir(), "embed-etcd")

e, err := embed.StartEtcd(cfg)
if err != nil {
t.Errorf("Failed to start etcd, got error %v", err)
}
defer e.Close()

go func() {
time.Sleep(time.Second)
e.Server.Stop()
t.Log("Stopped server during bootstrapping")
}()

select {
case <-e.Server.ReadyNotify():
t.Log("Server is ready!")
case <-e.Server.StopNotify():
t.Log("Server is stopped")
case <-time.After(20 * time.Second):
e.Server.Stop() // trigger a shutdown
t.Error("Server took too long to start!")
}
}()

select {
case <-done:
case <-time.After(10 * time.Second):
t.Error("timeout in bootstrapping etcd")
}
}

0 comments on commit ab34cdb

Please sign in to comment.