From 1f1167b37d905a9322a634fe1ea5c48fa1f235de Mon Sep 17 00:00:00 2001 From: Max Ekman Date: Mon, 22 Nov 2021 10:36:56 +0100 Subject: [PATCH 1/2] refactor: add tracing package to consolidate --- examples/todomvc/backend/main.go | 44 +++++++++++++++---- .../commandhandler.go | 4 +- {eventbus/tracing => tracing}/context.go | 0 {eventbus/tracing => tracing}/eventbus.go | 5 +-- .../tracing => tracing}/eventbus_test.go | 0 .../middleware.go => tracing/eventhandler.go | 4 +- {eventstore/tracing => tracing}/eventstore.go | 0 .../tracing => tracing}/eventstore_test.go | 0 {repo/tracing => tracing}/repo.go | 0 {repo/tracing => tracing}/repo_test.go | 0 10 files changed, 41 insertions(+), 16 deletions(-) rename middleware/commandhandler/tracing/middleware.go => tracing/commandhandler.go (89%) rename {eventbus/tracing => tracing}/context.go (100%) rename {eventbus/tracing => tracing}/eventbus.go (88%) rename {eventbus/tracing => tracing}/eventbus_test.go (100%) rename middleware/eventhandler/tracing/middleware.go => tracing/eventhandler.go (91%) rename {eventstore/tracing => tracing}/eventstore.go (100%) rename {eventstore/tracing => tracing}/eventstore_test.go (100%) rename {repo/tracing => tracing}/repo.go (100%) rename {repo/tracing => tracing}/repo_test.go (100%) diff --git a/examples/todomvc/backend/main.go b/examples/todomvc/backend/main.go index d64b5191..6998b653 100644 --- a/examples/todomvc/backend/main.go +++ b/examples/todomvc/backend/main.go @@ -26,17 +26,13 @@ import ( eh "github.com/looplab/eventhorizon" "github.com/looplab/eventhorizon/commandhandler/bus" - redisEventBus "github.com/looplab/eventhorizon/eventbus/redis" - tracingEventBus "github.com/looplab/eventhorizon/eventbus/tracing" mongoEventStore "github.com/looplab/eventhorizon/eventstore/mongodb" - tracingEventStore "github.com/looplab/eventhorizon/eventstore/tracing" - "github.com/looplab/eventhorizon/middleware/commandhandler/tracing" "github.com/looplab/eventhorizon/middleware/eventhandler/observer" mongoOutbox "github.com/looplab/eventhorizon/outbox/mongodb" mongoRepo "github.com/looplab/eventhorizon/repo/mongodb" - tracingRepo "github.com/looplab/eventhorizon/repo/tracing" "github.com/looplab/eventhorizon/repo/version" + "github.com/looplab/eventhorizon/tracing" "github.com/looplab/eventhorizon/uuid" "github.com/looplab/eventhorizon/examples/todomvc/backend/domains/todo" @@ -51,6 +47,7 @@ func main() { if mongodbAddr == "" { mongodbAddr = "localhost:27017" } + mongodbURI := "mongodb://" + mongodbAddr // Connect to localhost if not running inside docker @@ -70,7 +67,9 @@ func main() { if _, err := rand.Read(b); err != nil { log.Fatal("could not get random DB name:", err) } + db := "todomvc-" + hex.EncodeToString(b) + log.Println("using DB:", db) traceCloser, err := NewTracer(db, tracingURL) @@ -83,6 +82,7 @@ func main() { if err != nil { log.Fatalf("could not create outbox: %s", err) } + go func() { for err := range outbox.Errors() { log.Print("outbox:", err) @@ -91,24 +91,28 @@ func main() { // Create the event store. var eventStore eh.EventStore + if eventStore, err = mongoEventStore.NewEventStoreWithClient( outbox.Client(), db, mongoEventStore.WithEventHandlerInTX(outbox), ); err != nil { log.Fatal("could not create event store: ", err) } - eventStore = tracingEventStore.NewEventStore(eventStore) + + eventStore = tracing.NewEventStore(eventStore) // Create an command bus. commandBus := bus.NewCommandHandler() // Create the repository and wrap in a version repository. var todoRepo eh.ReadWriteRepo + if todoRepo, err = mongoRepo.NewRepo(mongodbURI, db, "todos"); err != nil { log.Fatal("could not create invitation repository: ", err) } + todoRepo = version.NewRepo(todoRepo) - todoRepo = tracingRepo.NewRepo(todoRepo) + todoRepo = tracing.NewRepo(todoRepo) // Setup the Todo domain. if err := todo.SetupDomain(commandBus, eventStore, outbox, todoRepo); err != nil { @@ -119,15 +123,18 @@ func main() { // Create the event bus that distributes events. var eventBus eh.EventBus + if eventBus, err = redisEventBus.NewEventBus(redisAddr, db, "backend"); err != nil { log.Fatal("could not create event bus: ", err) } + go func() { for err := range eventBus.Errors() { log.Print("eventbus:", err) } }() - eventBus = tracingEventBus.NewEventBus(eventBus) + + eventBus = tracing.NewEventBus(eventBus) if err := outbox.AddHandler(ctx, eh.MatchAll{}, eventBus); err != nil { log.Fatal("could not add event bus to outbox:", err) } @@ -144,7 +151,7 @@ func main() { // Add tracing middleware to init tracing spans, and the logging middleware. commandHandler := eh.UseCommandHandlerMiddleware(commandBus, - tracing.NewMiddleware(), + tracing.NewCommandHandlerMiddleware(), CommandLogger, ) @@ -153,15 +160,18 @@ func main() { if err != nil { log.Fatal("could not create handler: ", err) } + srv := &http.Server{ Addr: ":8080", Handler: h, } srvClosed := make(chan struct{}) + go func() { if err := srv.ListenAndServe(); err != http.ErrServerClosed { log.Fatal("could not listen HTTP: ", err) } + close(srvClosed) }() @@ -182,25 +192,32 @@ func main() { <-sigint log.Println("waiting for HTTP server to close") + if err := srv.Shutdown(context.Background()); err != nil { log.Print("could not shutdown HTTP server: ", err) } + <-srvClosed // Cancel all handlers and wait. log.Println("waiting for handlers to finish") + if err := eventBus.Close(); err != nil { log.Print("could not close event bus: ", err) } + if err := outbox.Close(); err != nil { log.Print("could not close outbox: ", err) } + if err := todoRepo.Close(); err != nil { log.Print("could not close todo repo: ", err) } + if err := eventStore.Close(); err != nil { log.Print("could not close event store: ", err) } + if err := traceCloser.Close(); err != nil { log.Print("could not close tracer: ", err) } @@ -212,6 +229,7 @@ func main() { func CommandLogger(h eh.CommandHandler) eh.CommandHandler { return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error { log.Printf("CMD: %#v", cmd) + return h.HandleCommand(ctx, cmd) }) } @@ -227,12 +245,14 @@ func (l *EventLogger) HandlerType() eh.EventHandlerType { // HandleEvent implements the HandleEvent method of the EventHandler interface. func (l *EventLogger) HandleEvent(ctx context.Context, event eh.Event) error { log.Printf("EVENT: %s", event) + return nil } func seedExample(h eh.CommandHandler, todoRepo eh.ReadRepo) { cmdCtx := context.Background() id := uuid.New() + if err := h.HandleCommand(cmdCtx, &todo.Create{ ID: id, }); err != nil { @@ -246,17 +266,21 @@ func seedExample(h eh.CommandHandler, todoRepo eh.ReadRepo) { }); err != nil { log.Fatal("there should be no error: ", err) } + if err := h.HandleCommand(cmdCtx, &todo.AddItem{ ID: id, Description: "Run the TodoMVC example", }); err != nil { log.Fatal("there should be no error: ", err) } + findCtx, cancelFind := version.NewContextWithMinVersionWait(cmdCtx, 3) if _, err := todoRepo.Find(findCtx, id); err != nil { log.Fatal("could not find created todo list: ", err) } + cancelFind() + if err := h.HandleCommand(cmdCtx, &todo.CheckAllItems{ ID: id, }); err != nil { @@ -270,12 +294,14 @@ func seedExample(h eh.CommandHandler, todoRepo eh.ReadRepo) { }); err != nil { log.Fatal("there should be no error: ", err) } + if err := h.HandleCommand(cmdCtx, &todo.AddItem{ ID: id, Description: "Read the Event Horizon source", }); err != nil { log.Fatal("there should be no error: ", err) } + if err := h.HandleCommand(cmdCtx, &todo.AddItem{ ID: id, Description: "Create a PR", diff --git a/middleware/commandhandler/tracing/middleware.go b/tracing/commandhandler.go similarity index 89% rename from middleware/commandhandler/tracing/middleware.go rename to tracing/commandhandler.go index 3ea20e5d..293eca60 100644 --- a/middleware/commandhandler/tracing/middleware.go +++ b/tracing/commandhandler.go @@ -23,8 +23,8 @@ import ( "github.com/opentracing/opentracing-go/ext" ) -// NewMiddleware returns a new command handler middleware that adds tracing spans. -func NewMiddleware() eh.CommandHandlerMiddleware { +// NewCommandHandlerMiddleware returns a new command handler middleware that adds tracing spans. +func NewCommandHandlerMiddleware() eh.CommandHandlerMiddleware { return eh.CommandHandlerMiddleware(func(h eh.CommandHandler) eh.CommandHandler { return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error { opName := fmt.Sprintf("Command(%s)", cmd.CommandType()) diff --git a/eventbus/tracing/context.go b/tracing/context.go similarity index 100% rename from eventbus/tracing/context.go rename to tracing/context.go diff --git a/eventbus/tracing/eventbus.go b/tracing/eventbus.go similarity index 88% rename from eventbus/tracing/eventbus.go rename to tracing/eventbus.go index bba60d98..e6ddb1e5 100644 --- a/eventbus/tracing/eventbus.go +++ b/tracing/eventbus.go @@ -18,7 +18,6 @@ import ( "context" eh "github.com/looplab/eventhorizon" - "github.com/looplab/eventhorizon/middleware/eventhandler/tracing" ) // EventBus is an event bus wrapper that adds tracing. @@ -33,7 +32,7 @@ func NewEventBus(eventBus eh.EventBus) *EventBus { EventBus: eventBus, // Wrap the eh.EventHandler part of the bus with tracing middleware, // set as producer to set the correct tags. - h: eh.UseEventHandlerMiddleware(eventBus, tracing.NewMiddleware()), + h: eh.UseEventHandlerMiddleware(eventBus, NewEventHandlerMiddleware()), } } @@ -49,7 +48,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event } // Wrap the handlers in tracing middleware. - h = eh.UseEventHandlerMiddleware(h, tracing.NewMiddleware()) + h = eh.UseEventHandlerMiddleware(h, NewEventHandlerMiddleware()) return b.EventBus.AddHandler(ctx, m, h) } diff --git a/eventbus/tracing/eventbus_test.go b/tracing/eventbus_test.go similarity index 100% rename from eventbus/tracing/eventbus_test.go rename to tracing/eventbus_test.go diff --git a/middleware/eventhandler/tracing/middleware.go b/tracing/eventhandler.go similarity index 91% rename from middleware/eventhandler/tracing/middleware.go rename to tracing/eventhandler.go index 30fc64ab..6191412d 100644 --- a/middleware/eventhandler/tracing/middleware.go +++ b/tracing/eventhandler.go @@ -23,8 +23,8 @@ import ( "github.com/opentracing/opentracing-go/ext" ) -// NewMiddleware returns an event handler middleware that adds tracing spans. -func NewMiddleware() eh.EventHandlerMiddleware { +// NewEventHandlerMiddleware returns an event handler middleware that adds tracing spans. +func NewEventHandlerMiddleware() eh.EventHandlerMiddleware { return eh.EventHandlerMiddleware(func(h eh.EventHandler) eh.EventHandler { return &eventHandler{h} }) diff --git a/eventstore/tracing/eventstore.go b/tracing/eventstore.go similarity index 100% rename from eventstore/tracing/eventstore.go rename to tracing/eventstore.go diff --git a/eventstore/tracing/eventstore_test.go b/tracing/eventstore_test.go similarity index 100% rename from eventstore/tracing/eventstore_test.go rename to tracing/eventstore_test.go diff --git a/repo/tracing/repo.go b/tracing/repo.go similarity index 100% rename from repo/tracing/repo.go rename to tracing/repo.go diff --git a/repo/tracing/repo_test.go b/tracing/repo_test.go similarity index 100% rename from repo/tracing/repo_test.go rename to tracing/repo_test.go From f47d2f740012acdc178ae9b49dcca44d1f4829f2 Mon Sep 17 00:00:00 2001 From: Max Ekman Date: Mon, 22 Nov 2021 12:35:59 +0100 Subject: [PATCH 2/2] fix: add tracing outbox --- examples/todomvc/backend/main.go | 10 +++- tracing/eventbus_test.go | 2 +- tracing/outbox.go | 54 +++++++++++++++++++ tracing/outbox_test.go | 92 ++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 tracing/outbox.go create mode 100644 tracing/outbox_test.go diff --git a/examples/todomvc/backend/main.go b/examples/todomvc/backend/main.go index 6998b653..a8b51858 100644 --- a/examples/todomvc/backend/main.go +++ b/examples/todomvc/backend/main.go @@ -78,22 +78,28 @@ func main() { } // Create the outbox that will project and publish events. - outbox, err := mongoOutbox.NewOutbox(mongodbURI, db) + var outbox eh.Outbox + + mongoOutbox, err := mongoOutbox.NewOutbox(mongodbURI, db) if err != nil { log.Fatalf("could not create outbox: %s", err) } + outbox = tracing.NewOutbox(mongoOutbox) + go func() { for err := range outbox.Errors() { log.Print("outbox:", err) } }() + outbox.Start() + // Create the event store. var eventStore eh.EventStore if eventStore, err = mongoEventStore.NewEventStoreWithClient( - outbox.Client(), db, + mongoOutbox.Client(), db, mongoEventStore.WithEventHandlerInTX(outbox), ); err != nil { log.Fatal("could not create event store: ", err) diff --git a/tracing/eventbus_test.go b/tracing/eventbus_test.go index 9ed6d9bc..6cf99f4f 100644 --- a/tracing/eventbus_test.go +++ b/tracing/eventbus_test.go @@ -23,7 +23,7 @@ import ( ) // NOTE: Not named "Integration" to enable running with the unit tests. -func TestAddHandler(t *testing.T) { +func TestEventBusAddHandler(t *testing.T) { if testing.Short() { t.Skip("skipping integration test") } diff --git a/tracing/outbox.go b/tracing/outbox.go new file mode 100644 index 00000000..3e7d9455 --- /dev/null +++ b/tracing/outbox.go @@ -0,0 +1,54 @@ +// Copyright (c) 2021 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "context" + + eh "github.com/looplab/eventhorizon" +) + +// Outbox is an event bus wrapper that adds tracing. +type Outbox struct { + eh.Outbox + h eh.EventHandler +} + +// NewOutbox creates a Outbox. +func NewOutbox(outbox eh.Outbox) *Outbox { + return &Outbox{ + Outbox: outbox, + // Wrap the eh.EventHandler part of the bus with tracing middleware, + // set as producer to set the correct tags. + h: eh.UseEventHandlerMiddleware(outbox, NewEventHandlerMiddleware()), + } +} + +// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface. +func (b *Outbox) HandleEvent(ctx context.Context, event eh.Event) error { + return b.h.HandleEvent(ctx, event) +} + +// AddHandler implements the AddHandler method of the eventhorizon.Outbox interface. +func (b *Outbox) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.EventHandler) error { + if h == nil { + return eh.ErrMissingHandler + } + + // Wrap the handlers in tracing middleware. + h = eh.UseEventHandlerMiddleware(h, NewEventHandlerMiddleware()) + + return b.Outbox.AddHandler(ctx, m, h) +} diff --git a/tracing/outbox_test.go b/tracing/outbox_test.go new file mode 100644 index 00000000..dc441a01 --- /dev/null +++ b/tracing/outbox_test.go @@ -0,0 +1,92 @@ +// Copyright (c) 2020 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "context" + "testing" + "time" + + "github.com/looplab/eventhorizon/outbox" + "github.com/looplab/eventhorizon/outbox/memory" +) + +// NOTE: Not named "Integration" to enable running with the unit tests. +func TestOutboxAddHandler(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + innerOutbox, err := memory.NewOutbox() + if err != nil { + t.Fatal("there should be no error:", err) + } + + o := NewOutbox(innerOutbox) + if o == nil { + t.Fatal("there should be an outbox") + } + + outbox.TestAddHandler(t, o, context.Background()) +} + +// NOTE: Not named "Integration" to enable running with the unit tests. +func TestOutbox(t *testing.T) { + // Shorter sweeps for testing + memory.PeriodicSweepInterval = 2 * time.Second + memory.PeriodicSweepAge = 2 * time.Second + + innerOutbox, err := memory.NewOutbox() + if err != nil { + t.Fatal("there should be no error:", err) + } + + o := NewOutbox(innerOutbox) + if o == nil { + t.Fatal("there should be an outbox") + } + + o.Start() + + outbox.AcceptanceTest(t, o, context.Background(), "none") + + if err := o.Close(); err != nil { + t.Error("there should be no error:", err) + } +} + +func BenchmarkOutbox(b *testing.B) { + // Shorter sweeps for testing + memory.PeriodicSweepInterval = 2 * time.Second + memory.PeriodicSweepAge = 2 * time.Second + + innerOutbox, err := memory.NewOutbox() + if err != nil { + b.Fatal("there should be no error:", err) + } + + o := NewOutbox(innerOutbox) + if o == nil { + b.Fatal("there should be an outbox") + } + + o.Start() + + outbox.Benchmark(b, o) + + if err := o.Close(); err != nil { + b.Error("there should be no error:", err) + } +}