diff --git a/Makefile b/Makefile index ec6dcd90..808487ed 100644 --- a/Makefile +++ b/Makefile @@ -6,27 +6,27 @@ lint: .PHONY: test test: - go test -v -race -short ./... + go test -race -short ./... .PHONY: test_cover test_cover: - go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -v -race -short -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c + go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -race -short -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c go run ./hack/coverage/coverage.go . unit.coverprofile @find . -name \.coverprofile -type f -delete .PHONY: test_integration test_integration: - go test -v -race -run Integration ./... + go test -race -run Integration ./... .PHONY: test_integration_cover test_integration_cover: - go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -v -race -run Integration -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c + go list -f '{{if len .TestGoFiles}}"cd {{.Dir}} && go test -race -run Integration -coverprofile={{.Dir}}/.coverprofile {{.ImportPath}}"{{end}}' ./... | xargs -L 1 sh -c go run ./hack/coverage/coverage.go . integration.coverprofile @find . -name \.coverprofile -type f -delete .PHONY: test_loadtest test_loadtest: - go test -race -v -run Loadtest ./... + go test -race -run Loadtest ./... .PHONY: test_all_docker test_all_docker: diff --git a/codec.go b/codec.go index d31fadbb..53923bde 100644 --- a/codec.go +++ b/codec.go @@ -23,3 +23,11 @@ type EventCodec interface { // UnmarshalEvent unmarshals an event and supported parts of context from bytes. UnmarshalEvent(context.Context, []byte) (Event, context.Context, error) } + +// CommandCodec is a codec for marshaling and unmarshaling commands to and from bytes. +type CommandCodec interface { + // MarshalCommand marshals a command and the supported parts of context into bytes. + MarshalCommand(context.Context, Command) ([]byte, error) + // UnmarshalCommand unmarshals a command and supported parts of context from bytes. + UnmarshalCommand(context.Context, []byte) (Command, context.Context, error) +} diff --git a/codec/acceptance_testing.go b/codec/acceptance_testing.go index 7dc8f843..bfdc8634 100644 --- a/codec/acceptance_testing.go +++ b/codec/acceptance_testing.go @@ -16,6 +16,7 @@ package codec import ( "context" + "reflect" "testing" "time" @@ -26,21 +27,27 @@ import ( func init() { eh.RegisterEventData(EventType, func() eh.EventData { return &EventData{} }) + + eh.RegisterCommand(func() eh.Command { return &Command{} }) } const ( // EventType is a the type for Event. EventType eh.EventType = "CodecEvent" + // AggregateType is the type for Aggregate. + AggregateType eh.AggregateType = "CodecAggregate" + // CommandType is the type for Command. + CommandType eh.CommandType = "CodecCommand" ) // EventCodecAcceptanceTest is the acceptance test that all implementations of -// Codec should pass. It should manually be called from a test case in each +// EventCodec should pass. It should manually be called from a test case in each // implementation: // // func TestEventCodec(t *testing.T) { // c := EventCodec{} // expectedBytes = []byte("") -// eventbus.AcceptanceTest(t, c, expectedBytes) +// codec.EventCodecAcceptanceTest(t, c, expectedBytes) // } // func EventCodecAcceptanceTest(t *testing.T, c eh.EventCodec, expectedBytes []byte) { @@ -117,3 +124,85 @@ type Nested struct { String string Number float64 } + +// CommandCodecAcceptanceTest is the acceptance test that all implementations of +// CommandCodec should pass. It should manually be called from a test case in each +// implementation: +// +// func TestCommandCodec(t *testing.T) { +// c := CommandCodec{} +// expectedBytes = []byte("") +// codec.CommandCodecAcceptanceTest(t, c, expectedBytes) +// } +// +func CommandCodecAcceptanceTest(t *testing.T, c eh.CommandCodec, expectedBytes []byte) { + // Marshaling. + ctx := mocks.WithContextOne(context.Background(), "testval") + id := uuid.MustParse("10a7ec0f-7f2b-46f5-bca1-877b6e33c9fd") + timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + cmd := &Command{ + ID: id, + Bool: true, + String: "string", + Number: 42.0, + Slice: []string{"a", "b"}, + Map: map[string]interface{}{"key": "value"}, // NOTE: Just one key to avoid compare issues. + Time: timestamp, + TimeRef: ×tamp, + Struct: Nested{ + Bool: true, + String: "string", + Number: 42.0, + }, + StructRef: &Nested{ + Bool: true, + String: "string", + Number: 42.0, + }, + } + + b, err := c.MarshalCommand(ctx, cmd) + if err != nil { + t.Error("there should be no error:", err) + } + + if string(b) != string(expectedBytes) { + t.Error("the encoded bytes should be correct:", string(b)) + } + + // Unmarshaling. + decodedCmd, decodedContext, err := c.UnmarshalCommand(context.Background(), b) + if err != nil { + t.Error("there should be no error:", err) + } + + if !reflect.DeepEqual(decodedCmd, cmd) { + t.Error("the decoded command was incorrect:", err) + } + + if val, ok := mocks.ContextOne(decodedContext); !ok || val != "testval" { + t.Error("the decoded context was incorrect:", decodedContext) + } +} + +// Command is a mocked eventhorizon.Command, useful in testing. +type Command struct { + ID uuid.UUID + Bool bool + String string + Number float64 + Slice []string + Map map[string]interface{} + Time time.Time + TimeRef *time.Time + NullTime *time.Time + Struct Nested + StructRef *Nested + NullStruct *Nested +} + +var _ = eh.Command(&Command{}) + +func (t *Command) AggregateID() uuid.UUID { return t.ID } +func (t *Command) AggregateType() eh.AggregateType { return AggregateType } +func (t *Command) CommandType() eh.CommandType { return CommandType } diff --git a/codec/bson/command.go b/codec/bson/command.go new file mode 100644 index 00000000..9e359a58 --- /dev/null +++ b/codec/bson/command.go @@ -0,0 +1,76 @@ +// Copyright (c) 2021 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bson + +import ( + "context" + "fmt" + + "go.mongodb.org/mongo-driver/bson" + + eh "github.com/looplab/eventhorizon" +) + +// CommandCodec is a codec for marshaling and unmarshaling commands +// to and from bytes in BSON format. +type CommandCodec struct{} + +// MarshalCommand marshals a command into bytes in BSON format. +func (_ CommandCodec) MarshalCommand(ctx context.Context, cmd eh.Command) ([]byte, error) { + c := command{ + CommandType: cmd.CommandType(), + Context: eh.MarshalContext(ctx), + } + + var err error + if c.Command, err = bson.Marshal(cmd); err != nil { + return nil, fmt.Errorf("could not marshal command data: %w", err) + } + + b, err := bson.Marshal(c) + if err != nil { + return nil, fmt.Errorf("could not marshal command: %w", err) + } + + return b, nil +} + +// UnmarshalCommand unmarshals a command from bytes in BSON format. +func (_ CommandCodec) UnmarshalCommand(ctx context.Context, b []byte) (eh.Command, context.Context, error) { + var c command + if err := bson.Unmarshal(b, &c); err != nil { + return nil, nil, fmt.Errorf("could not unmarshal command: %w", err) + } + + cmd, err := eh.CreateCommand(c.CommandType) + if err != nil { + return nil, nil, fmt.Errorf("could not create command: %w", err) + } + + if err := bson.Unmarshal(c.Command, cmd); err != nil { + return nil, nil, fmt.Errorf("could not unmarshal command data: %w", err) + } + + ctx = eh.UnmarshalContext(ctx, c.Context) + + return cmd, ctx, nil +} + +// command is the internal structure used on the wire only. +type command struct { + CommandType eh.CommandType `bson:"command_type"` + Command bson.Raw `bson:"command"` + Context map[string]interface{} `bson:"context"` +} diff --git a/codec/bson/command_test.go b/codec/bson/command_test.go new file mode 100644 index 00000000..d5b1115f --- /dev/null +++ b/codec/bson/command_test.go @@ -0,0 +1,33 @@ +// Copyright (c) 2021 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bson + +import ( + "encoding/base64" + "testing" + + "github.com/looplab/eventhorizon/codec" +) + +func TestCommandCodec(t *testing.T) { + c := &CommandCodec{} + + expectedBytes, err := base64.StdEncoding.DecodeString("jQEAAAJjb21tYW5kX3R5cGUADQAAAENvZGVjQ29tbWFuZAADY29tbWFuZAA5AQAAAmlkACUAAAAxMGE3ZWMwZi03ZjJiLTQ2ZjUtYmNhMS04NzdiNmUzM2M5ZmQACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVABHNsaWNlABcAAAACMAACAAAAYQACMQACAAAAYgAAA21hcAAUAAAAAmtleQAGAAAAdmFsdWUAAAl0aW1lAIA1U+AkAQAACXRpbWVyZWYAgDVT4CQBAAAKbnVsbHRpbWUAA3N0cnVjdAAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAANzdHJ1Y3RyZWYALwAAAAhib29sAAECc3RyaW5nAAcAAABzdHJpbmcAAW51bWJlcgAAAAAAAABFQAAKbnVsbHN0cnVjdAAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==") + if err != nil { + t.Error("could not decode expected bytes:", err) + } + + codec.CommandCodecAcceptanceTest(t, c, expectedBytes) +} diff --git a/codec/bson/codec.go b/codec/bson/event.go similarity index 100% rename from codec/bson/codec.go rename to codec/bson/event.go diff --git a/codec/bson/codec_test.go b/codec/bson/event_test.go similarity index 100% rename from codec/bson/codec_test.go rename to codec/bson/event_test.go index 2c9c4e89..c8002874 100644 --- a/codec/bson/codec_test.go +++ b/codec/bson/event_test.go @@ -23,8 +23,8 @@ import ( func TestEventCodec(t *testing.T) { c := &EventCodec{} - expectedBytes, err := base64.StdEncoding.DecodeString("4QEAAAJldmVudF90eXBlAAsAAABDb2RlY0V2ZW50AANkYXRhAAwBAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAEc2xpY2UAFwAAAAIwAAIAAABhAAIxAAIAAABiAAADbWFwABQAAAACa2V5AAYAAAB2YWx1ZQAACXRpbWUAgDVT4CQBAAAJdGltZXJlZgCANVPgJAEAAApudWxsdGltZQADc3RydWN0AC8AAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAAA3N0cnVjdHJlZgAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAApudWxsc3RydWN0AAAJdGltZXN0YW1wAIA1U+AkAQAAAmFnZ3JlZ2F0ZV90eXBlAAoAAABBZ2dyZWdhdGUAAl9pZAAlAAAAMTBhN2VjMGYtN2YyYi00NmY1LWJjYTEtODc3YjZlMzNjOWZkABB2ZXJzaW9uAAEAAAADbWV0YWRhdGEAEgAAAAFudW0AAAAAAAAARUAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==") + expectedBytes, err := base64.StdEncoding.DecodeString("4QEAAAJldmVudF90eXBlAAsAAABDb2RlY0V2ZW50AANkYXRhAAwBAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAEc2xpY2UAFwAAAAIwAAIAAABhAAIxAAIAAABiAAADbWFwABQAAAACa2V5AAYAAAB2YWx1ZQAACXRpbWUAgDVT4CQBAAAJdGltZXJlZgCANVPgJAEAAApudWxsdGltZQADc3RydWN0AC8AAAAIYm9vbAABAnN0cmluZwAHAAAAc3RyaW5nAAFudW1iZXIAAAAAAAAARUAAA3N0cnVjdHJlZgAvAAAACGJvb2wAAQJzdHJpbmcABwAAAHN0cmluZwABbnVtYmVyAAAAAAAAAEVAAApudWxsc3RydWN0AAAJdGltZXN0YW1wAIA1U+AkAQAAAmFnZ3JlZ2F0ZV90eXBlAAoAAABBZ2dyZWdhdGUAAl9pZAAlAAAAMTBhN2VjMGYtN2YyYi00NmY1LWJjYTEtODc3YjZlMzNjOWZkABB2ZXJzaW9uAAEAAAADbWV0YWRhdGEAEgAAAAFudW0AAAAAAAAARUAAA2NvbnRleHQAHgAAAAJjb250ZXh0X29uZQAIAAAAdGVzdHZhbAAAAA==") if err != nil { t.Error("could not decode expected bytes:", err) } diff --git a/codec/json/command.go b/codec/json/command.go new file mode 100644 index 00000000..8fa6c0d5 --- /dev/null +++ b/codec/json/command.go @@ -0,0 +1,75 @@ +// Copyright (c) 2021 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package json + +import ( + "context" + "encoding/json" + "fmt" + + eh "github.com/looplab/eventhorizon" +) + +// CommandCodec is a codec for marshaling and unmarshaling commands +// to and from bytes in JSON format. +type CommandCodec struct{} + +// MarshalCommand marshals a command into bytes in JSON format. +func (_ CommandCodec) MarshalCommand(ctx context.Context, cmd eh.Command) ([]byte, error) { + c := command{ + CommandType: cmd.CommandType(), + Context: eh.MarshalContext(ctx), + } + + var err error + if c.Command, err = json.Marshal(cmd); err != nil { + return nil, fmt.Errorf("could not marshal command data: %w", err) + } + + b, err := json.Marshal(c) + if err != nil { + return nil, fmt.Errorf("could not marshal command: %w", err) + } + + return b, nil +} + +// UnmarshalCommand unmarshals a command from bytes in JSON format. +func (_ CommandCodec) UnmarshalCommand(ctx context.Context, b []byte) (eh.Command, context.Context, error) { + var c command + if err := json.Unmarshal(b, &c); err != nil { + return nil, nil, fmt.Errorf("could not unmarshal command: %w", err) + } + + cmd, err := eh.CreateCommand(c.CommandType) + if err != nil { + return nil, nil, fmt.Errorf("could not create command: %w", err) + } + + if err := json.Unmarshal(c.Command, &cmd); err != nil { + return nil, nil, fmt.Errorf("could not unmarshal command data: %w", err) + } + + ctx = eh.UnmarshalContext(ctx, c.Context) + + return cmd, ctx, nil +} + +// command is the internal structure used on the wire only. +type command struct { + CommandType eh.CommandType `json:"command_type"` + Command json.RawMessage `json:"command"` + Context map[string]interface{} `json:"context"` +} diff --git a/codec/json/command_test.go b/codec/json/command_test.go new file mode 100644 index 00000000..7cda5597 --- /dev/null +++ b/codec/json/command_test.go @@ -0,0 +1,48 @@ +// Copyright (c) 2021 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package json + +import ( + "strings" + "testing" + + "github.com/looplab/eventhorizon/codec" +) + +func TestCommandCodec(t *testing.T) { + c := &CommandCodec{} + + expectedBytes := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(` + { + "command_type": "CodecCommand", + "command": { + "ID": "10a7ec0f-7f2b-46f5-bca1-877b6e33c9fd", + "Bool": true, + "String": "string", + "Number": 42, + "Slice": ["a", "b"], + "Map": { "key": "value" }, + "Time": "2009-11-10T23:00:00Z", + "TimeRef": "2009-11-10T23:00:00Z", + "NullTime": null, + "Struct": { "Bool": true, "String": "string", "Number": 42 }, + "StructRef": { "Bool": true, "String": "string", "Number": 42 }, + "NullStruct": null + }, + "context": { "context_one": "testval" } + }`, " ", ""), "\n", ""), "\t", "") + + codec.CommandCodecAcceptanceTest(t, c, []byte(expectedBytes)) +} diff --git a/codec/json/codec.go b/codec/json/event.go similarity index 100% rename from codec/json/codec.go rename to codec/json/event.go diff --git a/codec/json/codec_test.go b/codec/json/event_test.go similarity index 99% rename from codec/json/codec_test.go rename to codec/json/event_test.go index 53e686e5..ba75059c 100644 --- a/codec/json/codec_test.go +++ b/codec/json/event_test.go @@ -23,6 +23,7 @@ import ( func TestEventCodec(t *testing.T) { c := &EventCodec{} + expectedBytes := strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(` { "event_type": "CodecEvent", @@ -46,5 +47,6 @@ func TestEventCodec(t *testing.T) { "metadata": { "num": 42 }, "context": { "context_one": "testval" } }`, " ", ""), "\n", ""), "\t", "") + codec.EventCodecAcceptanceTest(t, c, []byte(expectedBytes)) } diff --git a/context.go b/context.go index ae43992b..2694461a 100644 --- a/context.go +++ b/context.go @@ -182,3 +182,11 @@ func UnmarshalContext(ctx context.Context, vals map[string]interface{}) context. return ctx } + +// CopyContext copies all values that are registered and exists in the `from` +// context to the `to` context. It basically runs a marshal/unmarshal back-to-back. +func CopyContext(from, to context.Context) context.Context { + vals := MarshalContext(from) + + return UnmarshalContext(to, vals) +} diff --git a/context_test.go b/context_test.go index e02039f1..aebbf471 100644 --- a/context_test.go +++ b/context_test.go @@ -81,6 +81,24 @@ func TestContextUnmarshaler(t *testing.T) { } } +func TestCopyContext(t *testing.T) { + if len(contextMarshalFuncs) != 2 { + t.Error("there should be two context marshalers") + } + + if len(contextUnmarshalFuncs) != 2 { + t.Error("there should be two context unmarshalers") + } + + from := WithContextTestOne(context.Background(), "testval") + + to := CopyContext(from, context.Background()) + + if val, ok := ContextTestOne(to); !ok || val != "testval" { + t.Error("the copied context should be correct:", val) + } +} + type contextTestKey int const ( diff --git a/examples/todomvc/Makefile b/examples/todomvc/Makefile index 8eb4705b..a3c77d9a 100644 --- a/examples/todomvc/Makefile +++ b/examples/todomvc/Makefile @@ -18,7 +18,7 @@ stop: .PHONY: run_backend run_backend: - go run -v backend/*.go + go run backend/*.go .PHONY: build_frontend build_frontend: diff --git a/middleware/commandhandler/scheduler/middleware.go b/middleware/commandhandler/scheduler/middleware.go index 7d7483a6..7ae76c0a 100644 --- a/middleware/commandhandler/scheduler/middleware.go +++ b/middleware/commandhandler/scheduler/middleware.go @@ -16,47 +16,20 @@ package scheduler import ( "context" + "errors" "fmt" + "sync" "time" eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" ) -// NewMiddleware returns a new async handling middleware that returns any errors -// on a error channel. -func NewMiddleware() (eh.CommandHandlerMiddleware, chan *Error) { - errCh := make(chan *Error, 20) +// The default command queue size to use. +var ScheduledCommandsQueueSize = 100 - return eh.CommandHandlerMiddleware(func(h eh.CommandHandler) eh.CommandHandler { - return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error { - // Delayed command execution if there is time set. - if c, ok := cmd.(Command); ok && !c.ExecuteAt().IsZero() { - go func() { - t := time.NewTimer(time.Until(c.ExecuteAt())) - defer t.Stop() - - var err error - select { - case <-ctx.Done(): - err = ctx.Err() - case <-t.C: - err = h.HandleCommand(ctx, cmd) - } - - if err != nil { - // Always try to deliver errors. - errCh <- &Error{err, ctx, cmd} - } - }() - - return nil - } - - // Immediate command execution. - return h.HandleCommand(ctx, cmd) - }) - }), errCh -} +// ErrCanceled is when a scheduled command has been canceled. +var ErrCanceled = errors.New("canceled") // Command is a scheduled command with an execution time. type Command interface { @@ -82,6 +55,308 @@ func (c *command) ExecuteAt() time.Time { return c.t } +// NewMiddleware returns a new command handler middleware and a scheduler helper. +func NewMiddleware(repo eh.ReadWriteRepo, codec eh.CommandCodec) (eh.CommandHandlerMiddleware, *Scheduler) { + s := &Scheduler{ + repo: repo, + cmdCh: make(chan *scheduledCommand, ScheduledCommandsQueueSize), + cancelScheduling: map[uuid.UUID]chan struct{}{}, + errCh: make(chan error, 100), + codec: codec, + } + + return eh.CommandHandlerMiddleware(func(h eh.CommandHandler) eh.CommandHandler { + s.setHandler(h) + + return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error { + // Delayed command execution if there is time set. + if c, ok := cmd.(Command); ok && !c.ExecuteAt().IsZero() { + // Use the wrapped command when created by the helper func. + innerCmd, ok := c.(*command) + if ok { + cmd = innerCmd.Command + } + + // Ignore the persisted command ID in this case. + _, err := s.ScheduleCommand(ctx, cmd, c.ExecuteAt()) + + return err + } + + // Immediate command execution. + return h.HandleCommand(ctx, cmd) + }) + }), s +} + +// PersistedCommand is a persisted command. +type PersistedCommand struct { + ID uuid.UUID `json:"_" bson:"_id"` + IDStr string `json:"id" bson:"_"` + RawCommand []byte `json:"command" bson:"command"` + ExecuteAt time.Time `json:"timestamp" bson:"timestamp"` + Command eh.Command `json:"-" bson:"-"` + Context context.Context `json:"-" bson:"-"` +} + +// EntityID implements the EntityID method of the eventhorizon.Entity interface. +func (c *PersistedCommand) EntityID() uuid.UUID { + return c.ID +} + +// Scheduler is a scheduled of commands. +type Scheduler struct { + h eh.CommandHandler + hMu sync.Mutex + repo eh.ReadWriteRepo + cmdCh chan *scheduledCommand + cancelScheduling map[uuid.UUID]chan struct{} + cancelSchedulingMu sync.Mutex + errCh chan error + cctx context.Context + cancel context.CancelFunc + done chan struct{} + codec eh.CommandCodec +} + +func (s *Scheduler) setHandler(h eh.CommandHandler) { + s.hMu.Lock() + defer s.hMu.Unlock() + + if s.h != nil { + panic("eventhorizon: handler already set for outbox") + } + + s.h = h +} + +// Load loads all persisted scheduled commands. It will be limited +// by ScheduledCommandsQueueSize if Start() has not yet been called. +func (s *Scheduler) Load(ctx context.Context) error { + commands, err := s.Commands(ctx) + if err != nil { + return fmt.Errorf("could not load scheduled commands: %w", err) + } + + for _, pc := range commands { + sc := &scheduledCommand{ + id: pc.ID, + ctx: pc.Context, + cmd: pc.Command, + executeAt: pc.ExecuteAt, + } + + select { + case s.cmdCh <- sc: + default: + return fmt.Errorf("could not schedule command, command queue full") + } + } + + return nil +} + +// Start starts the scheduler. +func (s *Scheduler) Start() error { + if s.h == nil { + return fmt.Errorf("command handler not set") + } + + s.cctx, s.cancel = context.WithCancel(context.Background()) + s.done = make(chan struct{}) + + go s.run() + + return nil +} + +// Stop stops all scheduled commands. +func (s *Scheduler) Stop() error { + s.cancel() + + <-s.done + + return nil +} + +// Errors returns an error channel that will receive errors from handling of +// scheduled commands. +func (s *Scheduler) Errors() <-chan error { + return s.errCh +} + +type scheduledCommand struct { + id uuid.UUID + ctx context.Context + cmd eh.Command + executeAt time.Time +} + +// ScheduleCommand schedules a command to be executed at `executeAt`. It is persisted +// to the repo. +func (s *Scheduler) ScheduleCommand(ctx context.Context, cmd eh.Command, executeAt time.Time) (uuid.UUID, error) { + b, err := s.codec.MarshalCommand(ctx, cmd) + if err != nil { + return uuid.Nil, &Error{ + Err: fmt.Errorf("could not marshal command: %w", err), + Ctx: ctx, + Command: cmd, + } + } + + // Use the command ID as persisted ID if available. + var id uuid.UUID + if cmd, ok := cmd.(eh.CommandIDer); ok { + id = cmd.CommandID() + } else { + id = uuid.New() + } + + pc := &PersistedCommand{ + ID: id, + IDStr: id.String(), + RawCommand: b, + ExecuteAt: executeAt, + } + + if err := s.repo.Save(ctx, pc); err != nil { + return uuid.Nil, &Error{ + Err: fmt.Errorf("could not persist command: %w", err), + Ctx: ctx, + Command: cmd, + } + } + + select { + case s.cmdCh <- &scheduledCommand{id, ctx, cmd, executeAt}: + default: + return uuid.Nil, &Error{ + Err: fmt.Errorf("command queue full"), + Ctx: ctx, + Command: cmd, + } + } + + return pc.ID, nil +} + +// Commands returns all scheduled commands. +func (s *Scheduler) Commands(ctx context.Context) ([]*PersistedCommand, error) { + entities, err := s.repo.FindAll(ctx) + if err != nil { + return nil, fmt.Errorf("could not load scheduled commands: %w", err) + } + + commands := make([]*PersistedCommand, len(entities)) + + for i, entity := range entities { + c, ok := entity.(*PersistedCommand) + if !ok { + return nil, fmt.Errorf("command is not schedulable: %T", entity) + } + + if c.Command, c.Context, err = s.codec.UnmarshalCommand(ctx, c.RawCommand); err != nil { + return nil, fmt.Errorf("could not unmarshal command: %w", err) + } + + c.RawCommand = nil + + if c.IDStr != "" { + id, err := uuid.Parse(c.IDStr) + if err != nil { + return nil, fmt.Errorf("could not parse command ID: %w", err) + } + + c.ID = id + } + + commands[i] = c + } + + return commands, nil +} + +// CancelCommand cancels a scheduled command. +func (s *Scheduler) CancelCommand(ctx context.Context, id uuid.UUID) error { + s.cancelSchedulingMu.Lock() + defer s.cancelSchedulingMu.Unlock() + + cancel, ok := s.cancelScheduling[id] + if !ok { + return fmt.Errorf("command %s not scheduled", id) + } + + close(cancel) + + return nil +} + +func (s *Scheduler) run() { + var wg sync.WaitGroup + +loop: + for { + select { + case <-s.cctx.Done(): + break loop + case sc := <-s.cmdCh: + wg.Add(1) + + s.cancelSchedulingMu.Lock() + cancel := make(chan struct{}) + s.cancelScheduling[sc.id] = cancel + s.cancelSchedulingMu.Unlock() + + go func(cancel chan struct{}) { + defer wg.Done() + + t := time.NewTimer(time.Until(sc.executeAt)) + defer t.Stop() + + select { + case <-s.cctx.Done(): + // Stop without removing persisted cmd. + case <-t.C: + if err := s.h.HandleCommand(sc.ctx, sc.cmd); err != nil { + // Always try to deliver errors. + s.errCh <- &Error{ + Err: err, + Ctx: sc.ctx, + Command: sc.cmd, + } + } + + if err := s.repo.Remove(sc.ctx, sc.id); err != nil { + s.errCh <- &Error{ + Err: fmt.Errorf("could not remove persisted command: %w", err), + Ctx: sc.ctx, + Command: sc.cmd, + } + } + case <-cancel: + if err := s.repo.Remove(sc.ctx, sc.id); err != nil { + s.errCh <- &Error{ + Err: fmt.Errorf("could not remove persisted command: %w", err), + Ctx: sc.ctx, + Command: sc.cmd, + } + } + + s.errCh <- &Error{ + Err: ErrCanceled, + Ctx: sc.ctx, + Command: sc.cmd, + } + } + }(cancel) + } + } + + wg.Wait() + + close(s.done) +} + // Error is an async error containing the error and the command. type Error struct { // Err is the error that happened when handling the command. diff --git a/middleware/commandhandler/scheduler/middleware_test.go b/middleware/commandhandler/scheduler/middleware_test.go index 7b42e356..6a84df1e 100644 --- a/middleware/commandhandler/scheduler/middleware_test.go +++ b/middleware/commandhandler/scheduler/middleware_test.go @@ -16,21 +16,40 @@ package scheduler import ( "context" + "crypto/rand" + "encoding/hex" "errors" + "os" "reflect" "testing" "time" eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/codec/bson" + "github.com/looplab/eventhorizon/codec/json" "github.com/looplab/eventhorizon/mocks" + "github.com/looplab/eventhorizon/repo/memory" + "github.com/looplab/eventhorizon/repo/mongodb" "github.com/looplab/eventhorizon/uuid" ) +func init() { + eh.RegisterCommand(func() eh.Command { return &mocks.Command{} }) +} + func TestMiddleware_Immediate(t *testing.T) { + repo := &mocks.Repo{} + m, s := NewMiddleware(repo, &json.CommandCodec{}) + inner := &mocks.CommandHandler{} - m, _ := NewMiddleware() + h := eh.UseCommandHandlerMiddleware(inner, m) - cmd := mocks.Command{ + + if err := s.Start(); err != nil { + t.Fatal("could not start scheduler:", err) + } + + cmd := &mocks.Command{ ID: uuid.New(), Content: "content", } @@ -42,13 +61,26 @@ func TestMiddleware_Immediate(t *testing.T) { if !reflect.DeepEqual(inner.Commands, []eh.Command{cmd}) { t.Error("the command should have been handled:", inner.Commands) } + + if err := s.Stop(); err != nil { + t.Fatal("could not stop scheduler:", err) + } } func TestMiddleware_Delayed(t *testing.T) { + repo := &mocks.Repo{} + + m, s := NewMiddleware(repo, &json.CommandCodec{}) + inner := &mocks.CommandHandler{} - m, _ := NewMiddleware() + h := eh.UseCommandHandlerMiddleware(inner, m) - cmd := mocks.Command{ + + if err := s.Start(); err != nil { + t.Fatal("could not start scheduler:", err) + } + + cmd := &mocks.Command{ ID: uuid.New(), Content: "content", } @@ -65,81 +97,204 @@ func TestMiddleware_Delayed(t *testing.T) { inner.RUnlock() time.Sleep(10 * time.Millisecond) + inner.RLock() - if !reflect.DeepEqual(inner.Commands, []eh.Command{c}) { + if !reflect.DeepEqual(inner.Commands, []eh.Command{cmd}) { t.Error("the command should have been handled:", inner.Commands) } inner.RUnlock() + + if err := s.Stop(); err != nil { + t.Fatal("could not stop scheduler:", err) + } } -func TestMiddleware_ZeroTime(t *testing.T) { - inner := &mocks.CommandHandler{} - m, _ := NewMiddleware() - h := eh.UseCommandHandlerMiddleware(inner, m) - cmd := mocks.Command{ - ID: uuid.New(), - Content: "content", +func TestMiddleware_Persisted(t *testing.T) { + repo := memory.NewRepo() + + repo.SetEntityFactory(func() eh.Entity { return &PersistedCommand{} }) + + testMiddleware_Persisted(t, repo, &json.CommandCodec{}) +} + +func TestMiddleware_PersistedIntegration(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") } - c := CommandWithExecuteTime(cmd, time.Time{}) - if err := h.HandleCommand(context.Background(), c); err != nil { + // Use MongoDB in Docker with fallback to localhost. + addr := os.Getenv("MONGODB_ADDR") + if addr == "" { + addr = "localhost:27017" + } + + url := "mongodb://" + addr + + // Get a random DB name. + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + t.Fatal(err) + } + + db := "test-" + hex.EncodeToString(b) + + t.Log("using DB:", db) + + repo, err := mongodb.NewRepo(url, db, "scheduled_commands") + if err != nil { t.Error("there should be no error:", err) } - if !reflect.DeepEqual(inner.Commands, []eh.Command{c}) { - t.Error("the command should have been handled:", inner.Commands) + if repo == nil { + t.Error("there should be a repository") } + + defer repo.Close() + + repo.SetEntityFactory(func() eh.Entity { return &PersistedCommand{} }) + + testMiddleware_Persisted(t, repo, &bson.CommandCodec{}) } -func TestMiddleware_Errors(t *testing.T) { - handlerErr := errors.New("handler error") - inner := &mocks.CommandHandler{ - Err: handlerErr, - } - m, errCh := NewMiddleware() +func testMiddleware_Persisted(t *testing.T, repo eh.ReadWriteRepo, codec eh.CommandCodec) { + m, s := NewMiddleware(repo, codec) + + inner := &mocks.CommandHandler{} + h := eh.UseCommandHandlerMiddleware(inner, m) - cmd := mocks.Command{ + + if err := s.Start(); err != nil { + t.Fatal("could not start scheduler:", err) + } + + if err := s.Load(context.Background()); err != nil { + t.Error("there should be no error:", err) + } + + cmd := &mocks.Command{ ID: uuid.New(), Content: "content", } - c := CommandWithExecuteTime(cmd, time.Now().Add(5*time.Millisecond)) + c := CommandWithExecuteTime(cmd, time.Now().Add(time.Second)) if err := h.HandleCommand(context.Background(), c); err != nil { t.Error("there should be no error:", err) } + time.Sleep(100 * time.Millisecond) + + if err := s.Stop(); err != nil { + t.Fatal("could not stop scheduler:", err) + } + + time.Sleep(100 * time.Millisecond) + + items, err := repo.FindAll(context.Background()) + if err != nil { + t.Error("there should be no error:", err) + } + + if len(items) != 1 { + t.Error("there should be a persisted command") + } + + inner.RLock() if len(inner.Commands) != 0 { t.Error("the command should not have been handled yet:", inner.Commands) } + inner.RUnlock() + + if err := s.Start(); err != nil { + t.Fatal("could not start scheduler:", err) + } + + if err := s.Load(context.Background()); err != nil { + t.Fatal("could not load scheduled commands:", err) + } + + time.Sleep(800 * time.Millisecond) + + inner.RLock() + if !reflect.DeepEqual(inner.Commands, []eh.Command{cmd}) { + t.Error("the command should have been handled:", inner.Commands) + } + inner.RUnlock() + + if err := s.Stop(); err != nil { + t.Fatal("could not stop scheduler:", err) + } + + items, err = repo.FindAll(context.Background()) + if err != nil { + t.Error("there should be no error:", err) + } + + if len(items) != 0 { + t.Error("there should be no persisted commands") + } - var err *Error select { - case err = <-errCh: + case err := <-s.Errors(): + if err != nil { + t.Error("there should be no error:", err) + } case <-time.After(10 * time.Millisecond): } +} - if err.Err != handlerErr { - t.Error("there should be an error:", err) +func TestMiddleware_ZeroTime(t *testing.T) { + repo := &mocks.Repo{} + m, s := NewMiddleware(repo, &json.CommandCodec{}) + + inner := &mocks.CommandHandler{} + + h := eh.UseCommandHandlerMiddleware(inner, m) + + if err := s.Start(); err != nil { + t.Fatal("could not start scheduler:", err) + } + + cmd := &mocks.Command{ + ID: uuid.New(), + Content: "content", + } + c := CommandWithExecuteTime(cmd, time.Time{}) + + if err := h.HandleCommand(context.Background(), c); err != nil { + t.Error("there should be no error:", err) + } + + if !reflect.DeepEqual(inner.Commands, []eh.Command{c}) { + t.Error("the command should have been handled:", inner.Commands) + } + + if err := s.Stop(); err != nil { + t.Fatal("could not stop scheduler:", err) } } -func TestMiddleware_ContextCanceled(t *testing.T) { +func TestMiddleware_Errors(t *testing.T) { + repo := &mocks.Repo{} + m, s := NewMiddleware(repo, &json.CommandCodec{}) + handlerErr := errors.New("handler error") inner := &mocks.CommandHandler{ Err: handlerErr, } - m, errCh := NewMiddleware() + h := eh.UseCommandHandlerMiddleware(inner, m) - cmd := mocks.Command{ + + if err := s.Start(); err != nil { + t.Fatal("could not start scheduler:", err) + } + + cmd := &mocks.Command{ ID: uuid.New(), Content: "content", } c := CommandWithExecuteTime(cmd, time.Now().Add(5*time.Millisecond)) - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - if err := h.HandleCommand(ctx, c); err != nil { + if err := h.HandleCommand(context.Background(), c); err != nil { t.Error("there should be no error:", err) } @@ -147,50 +302,85 @@ func TestMiddleware_ContextCanceled(t *testing.T) { t.Error("the command should not have been handled yet:", inner.Commands) } - var err *Error + var err error select { - case err = <-errCh: + case err = <-s.Errors(): case <-time.After(10 * time.Millisecond): } - canceledErr := context.Canceled - if !errors.Is(err, canceledErr) { + if !errors.Is(err, handlerErr) { t.Error("there should be an error:", err) } + + if err := s.Stop(); err != nil { + t.Fatal("could not stop scheduler:", err) + } } -func TestMiddleware_ContextDeadline(t *testing.T) { - handlerErr := errors.New("handler error") - inner := &mocks.CommandHandler{ - Err: handlerErr, +func TestMiddleware_Cancel(t *testing.T) { + repo := memory.NewRepo() + + repo.SetEntityFactory(func() eh.Entity { return &PersistedCommand{} }) + + m, s := NewMiddleware(repo, &json.CommandCodec{}) + + inner := &mocks.CommandHandler{} + + // Handler is not used in this case. + eh.UseCommandHandlerMiddleware(inner, m) + + if err := s.Start(); err != nil { + t.Fatal("could not start scheduler:", err) } - m, errCh := NewMiddleware() - h := eh.UseCommandHandlerMiddleware(inner, m) - cmd := mocks.Command{ + + nonExistingID := uuid.New() + if err := s.CancelCommand(context.Background(), nonExistingID); err == nil || + err.Error() != "command "+nonExistingID.String()+" not scheduled" { + t.Error("there should be an error:", err) + } + + cmd := &mocks.Command{ ID: uuid.New(), Content: "content", } - c := CommandWithExecuteTime(cmd, time.Now().Add(5*time.Millisecond)) - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() + id, err := s.ScheduleCommand(context.Background(), cmd, time.Now().Add(50*time.Millisecond)) + if err != nil { + t.Error("there should be no error:", err) + } - if err := h.HandleCommand(ctx, c); err != nil { + items, err := repo.FindAll(context.Background()) + if err != nil { t.Error("there should be no error:", err) } - if len(inner.Commands) != 0 { - t.Error("the command should not have been handled yet:", inner.Commands) + if len(items) != 1 { + t.Error("there should be a persisted command") + } + + if err := s.CancelCommand(context.Background(), id); err != nil { + t.Error("there should be no error:", err) } - var err *Error select { - case err = <-errCh: + case err = <-s.Errors(): case <-time.After(10 * time.Millisecond): } - deadlineExceededErr := context.DeadlineExceeded - if !errors.Is(err, deadlineExceededErr) { + if !errors.Is(err, ErrCanceled) { t.Error("there should be an error:", err) } + + if err := s.Stop(); err != nil { + t.Fatal("could not stop scheduler:", err) + } + + items, err = repo.FindAll(context.Background()) + if err != nil { + t.Error("there should be no error:", err) + } + + if len(items) != 0 { + t.Error("there should be no persisted commands") + } }