Skip to content

Commit

Permalink
fix: add tracing outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Nov 22, 2021
1 parent 1f1167b commit f47d2f7
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 3 deletions.
10 changes: 8 additions & 2 deletions examples/todomvc/backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,28 @@ func main() {
}

// Create the outbox that will project and publish events.
outbox, err := mongoOutbox.NewOutbox(mongodbURI, db)
var outbox eh.Outbox

mongoOutbox, err := mongoOutbox.NewOutbox(mongodbURI, db)
if err != nil {
log.Fatalf("could not create outbox: %s", err)
}

outbox = tracing.NewOutbox(mongoOutbox)

go func() {
for err := range outbox.Errors() {
log.Print("outbox:", err)
}
}()

outbox.Start()

// Create the event store.
var eventStore eh.EventStore

if eventStore, err = mongoEventStore.NewEventStoreWithClient(
outbox.Client(), db,
mongoOutbox.Client(), db,
mongoEventStore.WithEventHandlerInTX(outbox),
); err != nil {
log.Fatal("could not create event store: ", err)
Expand Down
2 changes: 1 addition & 1 deletion tracing/eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

// NOTE: Not named "Integration" to enable running with the unit tests.
func TestAddHandler(t *testing.T) {
func TestEventBusAddHandler(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
Expand Down
54 changes: 54 additions & 0 deletions tracing/outbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tracing

import (
"context"

eh "github.com/looplab/eventhorizon"
)

// Outbox is an event bus wrapper that adds tracing.
type Outbox struct {
eh.Outbox
h eh.EventHandler
}

// NewOutbox creates a Outbox.
func NewOutbox(outbox eh.Outbox) *Outbox {
return &Outbox{
Outbox: outbox,
// Wrap the eh.EventHandler part of the bus with tracing middleware,
// set as producer to set the correct tags.
h: eh.UseEventHandlerMiddleware(outbox, NewEventHandlerMiddleware()),
}
}

// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (b *Outbox) HandleEvent(ctx context.Context, event eh.Event) error {
return b.h.HandleEvent(ctx, event)
}

// AddHandler implements the AddHandler method of the eventhorizon.Outbox interface.
func (b *Outbox) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.EventHandler) error {
if h == nil {
return eh.ErrMissingHandler
}

// Wrap the handlers in tracing middleware.
h = eh.UseEventHandlerMiddleware(h, NewEventHandlerMiddleware())

return b.Outbox.AddHandler(ctx, m, h)
}
92 changes: 92 additions & 0 deletions tracing/outbox_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2020 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tracing

import (
"context"
"testing"
"time"

"github.com/looplab/eventhorizon/outbox"
"github.com/looplab/eventhorizon/outbox/memory"
)

// NOTE: Not named "Integration" to enable running with the unit tests.
func TestOutboxAddHandler(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

innerOutbox, err := memory.NewOutbox()
if err != nil {
t.Fatal("there should be no error:", err)
}

o := NewOutbox(innerOutbox)
if o == nil {
t.Fatal("there should be an outbox")
}

outbox.TestAddHandler(t, o, context.Background())
}

// NOTE: Not named "Integration" to enable running with the unit tests.
func TestOutbox(t *testing.T) {
// Shorter sweeps for testing
memory.PeriodicSweepInterval = 2 * time.Second
memory.PeriodicSweepAge = 2 * time.Second

innerOutbox, err := memory.NewOutbox()
if err != nil {
t.Fatal("there should be no error:", err)
}

o := NewOutbox(innerOutbox)
if o == nil {
t.Fatal("there should be an outbox")
}

o.Start()

outbox.AcceptanceTest(t, o, context.Background(), "none")

if err := o.Close(); err != nil {
t.Error("there should be no error:", err)
}
}

func BenchmarkOutbox(b *testing.B) {
// Shorter sweeps for testing
memory.PeriodicSweepInterval = 2 * time.Second
memory.PeriodicSweepAge = 2 * time.Second

innerOutbox, err := memory.NewOutbox()
if err != nil {
b.Fatal("there should be no error:", err)
}

o := NewOutbox(innerOutbox)
if o == nil {
b.Fatal("there should be an outbox")
}

o.Start()

outbox.Benchmark(b, o)

if err := o.Close(); err != nil {
b.Error("there should be no error:", err)
}
}

0 comments on commit f47d2f7

Please sign in to comment.