diff --git a/service/history/api/respondworkflowtaskcompleted/api_test.go b/service/history/api/respondworkflowtaskcompleted/api_test.go index e78e8e9d2d0..a2e13b293b5 100644 --- a/service/history/api/respondworkflowtaskcompleted/api_test.go +++ b/service/history/api/respondworkflowtaskcompleted/api_test.go @@ -63,6 +63,7 @@ import ( "go.temporal.io/server/service/history/api" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.temporal.io/server/service/history/workflow" @@ -118,7 +119,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) SetupSubTest() { s.NoError(err) s.mockShard.SetStateMachineRegistry(reg) - mockEngine := shard.NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes() mockEngine.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes() s.mockShard.SetEngineForTesting(mockEngine) diff --git a/service/history/history_engine.go b/service/history/history_engine.go index b4339043594..5508aea2c17 100644 --- a/service/history/history_engine.go +++ b/service/history/history_engine.go @@ -106,6 +106,7 @@ import ( "go.temporal.io/server/service/history/deletemanager" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/ndc" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/replication" @@ -183,7 +184,7 @@ func NewEngineWithShardContext( dlqWriter replication.DLQWriter, commandHandlerRegistry *workflow.CommandHandlerRegistry, outboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool, -) shard.Engine { +) historyi.Engine { currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() logger := shard.GetLogger() @@ -759,14 +760,14 @@ func (e *historyEngineImpl) SyncActivities( func (e *historyEngineImpl) SyncHSM( ctx context.Context, - request *shard.SyncHSMRequest, + request *historyi.SyncHSMRequest, ) error { return e.nDCHSMStateReplicator.SyncHSMState(ctx, request) } func (e *historyEngineImpl) BackfillHistoryEvents( ctx context.Context, - request *shard.BackfillHistoryEventsRequest, + request *historyi.BackfillHistoryEventsRequest, ) error { return e.nDCHistoryReplicator.BackfillHistoryEvents(ctx, request) } diff --git a/service/history/history_engine_factory.go b/service/history/history_engine_factory.go index de90aa4fb3c..4bea26083b3 100644 --- a/service/history/history_engine_factory.go +++ b/service/history/history_engine_factory.go @@ -36,6 +36,7 @@ import ( "go.temporal.io/server/service/history/circuitbreakerpool" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/replication" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" @@ -77,7 +78,7 @@ type ( func (f *historyEngineFactory) CreateEngine( shard shard.Context, -) shard.Engine { +) historyi.Engine { var wfCache wcache.Cache if shard.GetConfig().EnableHostLevelHistoryCache() { wfCache = f.WorkflowCache diff --git a/service/history/shard/engine.go b/service/history/interfaces/engine.go similarity index 95% rename from service/history/shard/engine.go rename to service/history/interfaces/engine.go index 8fdbcd277d7..c119607c7f8 100644 --- a/service/history/shard/engine.go +++ b/service/history/interfaces/engine.go @@ -24,7 +24,7 @@ //go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination engine_mock.go -package shard +package interfaces import ( "context" @@ -37,7 +37,6 @@ import ( persistencespb "go.temporal.io/server/api/persistence/v1" replicationspb "go.temporal.io/server/api/replication/v1" workflowspb "go.temporal.io/server/api/workflow/v1" - "go.temporal.io/server/common/collection" "go.temporal.io/server/common/definition" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/namespace" @@ -130,23 +129,6 @@ type ( Start() Stop() } - - ReplicationStream interface { - SubscribeReplicationNotification() (<-chan struct{}, string) - UnsubscribeReplicationNotification(string) - ConvertReplicationTask( - ctx context.Context, - task tasks.Task, - clusterID int32, - ) (*replicationspb.ReplicationTask, error) - GetReplicationTasksIter( - ctx context.Context, - pollingCluster string, - minInclusiveTaskID int64, - maxExclusiveTaskID int64, - ) (collection.Iterator[tasks.Task], error) - GetMaxReplicationTaskInfo() (int64, time.Time) - } ) type ( diff --git a/service/history/shard/engine_mock.go b/service/history/interfaces/engine_mock.go similarity index 92% rename from service/history/shard/engine_mock.go rename to service/history/interfaces/engine_mock.go index b6e996738e7..76923ffde4d 100644 --- a/service/history/shard/engine_mock.go +++ b/service/history/interfaces/engine_mock.go @@ -27,11 +27,11 @@ // // Generated by this command: // -// mockgen -copyright_file ../../../LICENSE -package shard -source engine.go -destination engine_mock.go +// mockgen -copyright_file ../../../LICENSE -package interfaces -source engine.go -destination engine_mock.go // -// Package shard is a generated GoMock package. -package shard +// Package interfaces is a generated GoMock package. +package interfaces import ( context "context" @@ -1140,98 +1140,3 @@ func (mr *MockEngineMockRecorder) VerifyFirstWorkflowTaskScheduled(ctx, request mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "VerifyFirstWorkflowTaskScheduled", reflect.TypeOf((*MockEngine)(nil).VerifyFirstWorkflowTaskScheduled), ctx, request) } - -// MockReplicationStream is a mock of ReplicationStream interface. -type MockReplicationStream struct { - ctrl *gomock.Controller - recorder *MockReplicationStreamMockRecorder -} - -// MockReplicationStreamMockRecorder is the mock recorder for MockReplicationStream. -type MockReplicationStreamMockRecorder struct { - mock *MockReplicationStream -} - -// NewMockReplicationStream creates a new mock instance. -func NewMockReplicationStream(ctrl *gomock.Controller) *MockReplicationStream { - mock := &MockReplicationStream{ctrl: ctrl} - mock.recorder = &MockReplicationStreamMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockReplicationStream) EXPECT() *MockReplicationStreamMockRecorder { - return m.recorder -} - -// ConvertReplicationTask mocks base method. -func (m *MockReplicationStream) ConvertReplicationTask(ctx context.Context, task tasks.Task, clusterID int32) (*repication.ReplicationTask, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ConvertReplicationTask", ctx, task, clusterID) - ret0, _ := ret[0].(*repication.ReplicationTask) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ConvertReplicationTask indicates an expected call of ConvertReplicationTask. -func (mr *MockReplicationStreamMockRecorder) ConvertReplicationTask(ctx, task, clusterID any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConvertReplicationTask", reflect.TypeOf((*MockReplicationStream)(nil).ConvertReplicationTask), ctx, task, clusterID) -} - -// GetMaxReplicationTaskInfo mocks base method. -func (m *MockReplicationStream) GetMaxReplicationTaskInfo() (int64, time.Time) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetMaxReplicationTaskInfo") - ret0, _ := ret[0].(int64) - ret1, _ := ret[1].(time.Time) - return ret0, ret1 -} - -// GetMaxReplicationTaskInfo indicates an expected call of GetMaxReplicationTaskInfo. -func (mr *MockReplicationStreamMockRecorder) GetMaxReplicationTaskInfo() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMaxReplicationTaskInfo", reflect.TypeOf((*MockReplicationStream)(nil).GetMaxReplicationTaskInfo)) -} - -// GetReplicationTasksIter mocks base method. -func (m *MockReplicationStream) GetReplicationTasksIter(ctx context.Context, pollingCluster string, minInclusiveTaskID, maxExclusiveTaskID int64) (collection.Iterator[tasks.Task], error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetReplicationTasksIter", ctx, pollingCluster, minInclusiveTaskID, maxExclusiveTaskID) - ret0, _ := ret[0].(collection.Iterator[tasks.Task]) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetReplicationTasksIter indicates an expected call of GetReplicationTasksIter. -func (mr *MockReplicationStreamMockRecorder) GetReplicationTasksIter(ctx, pollingCluster, minInclusiveTaskID, maxExclusiveTaskID any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReplicationTasksIter", reflect.TypeOf((*MockReplicationStream)(nil).GetReplicationTasksIter), ctx, pollingCluster, minInclusiveTaskID, maxExclusiveTaskID) -} - -// SubscribeReplicationNotification mocks base method. -func (m *MockReplicationStream) SubscribeReplicationNotification() (<-chan struct{}, string) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SubscribeReplicationNotification") - ret0, _ := ret[0].(<-chan struct{}) - ret1, _ := ret[1].(string) - return ret0, ret1 -} - -// SubscribeReplicationNotification indicates an expected call of SubscribeReplicationNotification. -func (mr *MockReplicationStreamMockRecorder) SubscribeReplicationNotification() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeReplicationNotification", reflect.TypeOf((*MockReplicationStream)(nil).SubscribeReplicationNotification)) -} - -// UnsubscribeReplicationNotification mocks base method. -func (m *MockReplicationStream) UnsubscribeReplicationNotification(arg0 string) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "UnsubscribeReplicationNotification", arg0) -} - -// UnsubscribeReplicationNotification indicates an expected call of UnsubscribeReplicationNotification. -func (mr *MockReplicationStreamMockRecorder) UnsubscribeReplicationNotification(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnsubscribeReplicationNotification", reflect.TypeOf((*MockReplicationStream)(nil).UnsubscribeReplicationNotification), arg0) -} diff --git a/service/history/interfaces/replication_stream.go b/service/history/interfaces/replication_stream.go new file mode 100644 index 00000000000..c3e116962fe --- /dev/null +++ b/service/history/interfaces/replication_stream.go @@ -0,0 +1,53 @@ +// The MIT License +// +// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package interfaces + +import ( + "context" + "time" + + replicationspb "go.temporal.io/server/api/replication/v1" + "go.temporal.io/server/common/collection" + "go.temporal.io/server/service/history/tasks" +) + +type ( + ReplicationStream interface { + SubscribeReplicationNotification() (<-chan struct{}, string) + UnsubscribeReplicationNotification(string) + ConvertReplicationTask( + ctx context.Context, + task tasks.Task, + clusterID int32, + ) (*replicationspb.ReplicationTask, error) + + GetReplicationTasksIter( + ctx context.Context, + pollingCluster string, + minInclusiveTaskID int64, + maxExclusiveTaskID int64, + ) (collection.Iterator[tasks.Task], error) + + GetMaxReplicationTaskInfo() (int64, time.Time) + } +) diff --git a/service/history/ndc/history_replicator.go b/service/history/ndc/history_replicator.go index d2225406931..09731b2325f 100644 --- a/service/history/ndc/history_replicator.go +++ b/service/history/ndc/history_replicator.go @@ -48,6 +48,7 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/common/primitives/timestamp" serviceerrors "go.temporal.io/server/common/serviceerror" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" wcache "go.temporal.io/server/service/history/workflow/cache" @@ -113,7 +114,7 @@ type ( newEvents []*historypb.HistoryEvent, newRunID string, ) error - BackfillHistoryEvents(ctx context.Context, request *shard.BackfillHistoryEventsRequest) error + BackfillHistoryEvents(ctx context.Context, request *historyi.BackfillHistoryEventsRequest) error } HistoryReplicatorImpl struct { @@ -229,7 +230,7 @@ func (r *HistoryReplicatorImpl) ApplyEvents( func (r *HistoryReplicatorImpl) BackfillHistoryEvents( ctx context.Context, - request *shard.BackfillHistoryEventsRequest, + request *historyi.BackfillHistoryEventsRequest, ) error { task, err := newReplicationTaskFromBatch( r.clusterMetadata, diff --git a/service/history/ndc/hsm_state_replicator.go b/service/history/ndc/hsm_state_replicator.go index 722a9c2ce9d..4f7e23e9a1d 100644 --- a/service/history/ndc/hsm_state_replicator.go +++ b/service/history/ndc/hsm_state_replicator.go @@ -41,6 +41,7 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/hsm" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" wcache "go.temporal.io/server/service/history/workflow/cache" @@ -50,7 +51,7 @@ type ( HSMStateReplicator interface { SyncHSMState( ctx context.Context, - request *shard.SyncHSMRequest, + request *historyi.SyncHSMRequest, ) error } @@ -76,7 +77,7 @@ func NewHSMStateReplicator( func (r *HSMStateReplicatorImpl) SyncHSMState( ctx context.Context, - request *shard.SyncHSMRequest, + request *historyi.SyncHSMRequest, ) (retError error) { namespaceID := namespace.ID(request.WorkflowKey.GetNamespaceID()) execution := &commonpb.WorkflowExecution{ @@ -150,7 +151,7 @@ func (r *HSMStateReplicatorImpl) SyncHSMState( func (r *HSMStateReplicatorImpl) syncHSMNode( mutableState workflow.MutableState, - request *shard.SyncHSMRequest, + request *historyi.SyncHSMRequest, ) (bool, error) { shouldSync, err := r.compareVersionHistory(mutableState, request.EventVersionHistory) diff --git a/service/history/ndc/hsm_state_replicator_mock.go b/service/history/ndc/hsm_state_replicator_mock.go index 17b9e7fff6c..7f8899a2feb 100644 --- a/service/history/ndc/hsm_state_replicator_mock.go +++ b/service/history/ndc/hsm_state_replicator_mock.go @@ -37,7 +37,7 @@ import ( context "context" reflect "reflect" - shard "go.temporal.io/server/service/history/shard" + interfaces "go.temporal.io/server/service/history/interfaces" gomock "go.uber.org/mock/gomock" ) @@ -65,7 +65,7 @@ func (m *MockHSMStateReplicator) EXPECT() *MockHSMStateReplicatorMockRecorder { } // SyncHSMState mocks base method. -func (m *MockHSMStateReplicator) SyncHSMState(ctx context.Context, request *shard.SyncHSMRequest) error { +func (m *MockHSMStateReplicator) SyncHSMState(ctx context.Context, request *interfaces.SyncHSMRequest) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SyncHSMState", ctx, request) ret0, _ := ret[0].(error) diff --git a/service/history/ndc/hsm_state_replicator_test.go b/service/history/ndc/hsm_state_replicator_test.go index c623627c448..472c9d80fdd 100644 --- a/service/history/ndc/hsm_state_replicator_test.go +++ b/service/history/ndc/hsm_state_replicator_test.go @@ -44,6 +44,7 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/hsm/hsmtest" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -94,7 +95,7 @@ func (s *hsmStateReplicatorSuite) SetupTest() { }, tests.NewDynamicConfig(), ) - mockEngine := shard.NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) mockEngine.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes() mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes() mockEngine.EXPECT().Stop().MaxTimes(1) @@ -154,7 +155,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_WorkflowNotFound() { }).Return(nil, serviceerror.NewNotFound("")).Times(1) lastEventID := int64(10) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: nonExistKey, EventVersionHistory: &historyspb.VersionHistory{ Items: []*historyspb.VersionHistoryItem{ @@ -187,7 +188,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_Diverge_LocalEventVersionLarger() DBRecordVersion: 777, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: &historyspb.VersionHistory{ Items: []*historyspb.VersionHistoryItem{ @@ -212,7 +213,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_Diverge_IncomingEventVersionLarger DBRecordVersion: 777, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: &historyspb.VersionHistory{ Items: []*historyspb.VersionHistoryItem{ @@ -255,7 +256,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_LocalEventVersionSuperSet() { }, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: &historyspb.VersionHistory{ Items: []*historyspb.VersionHistoryItem{ @@ -300,7 +301,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingEventVersionSuperSet() { DBRecordVersion: 777, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: &historyspb.VersionHistory{ Items: []*historyspb.VersionHistoryItem{ @@ -336,7 +337,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateStale() { DBRecordVersion: 777, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], StateMachineNode: &persistencespb.StateMachineNode{ @@ -374,7 +375,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingLastUpdateVersionStale() { DBRecordVersion: 777, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], StateMachineNode: &persistencespb.StateMachineNode{ @@ -413,7 +414,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingLastUpdateVersionedTransit DBRecordVersion: 777, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], StateMachineNode: &persistencespb.StateMachineNode{ @@ -459,7 +460,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingLastUpdateVersionNewer() { }, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], StateMachineNode: &persistencespb.StateMachineNode{ @@ -505,7 +506,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingLastUpdateVersionedTransit }, }, nil).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], StateMachineNode: &persistencespb.StateMachineNode{ @@ -569,7 +570,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateNewer_WorkflowOpen() }, ).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], StateMachineNode: &persistencespb.StateMachineNode{ @@ -621,7 +622,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateNewer_WorkflowZombie( }, ).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], StateMachineNode: &persistencespb.StateMachineNode{ @@ -677,7 +678,7 @@ func (s *hsmStateReplicatorSuite) TestSyncHSM_IncomingStateNewer_WorkflowClosed( }, ).Times(1) - err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &shard.SyncHSMRequest{ + err := s.nDCHSMStateReplicator.SyncHSMState(context.Background(), &historyi.SyncHSMRequest{ WorkflowKey: s.workflowKey, EventVersionHistory: persistedState.ExecutionInfo.VersionHistories.Histories[0], StateMachineNode: &persistencespb.StateMachineNode{ diff --git a/service/history/ndc/workflow_state_replicator.go b/service/history/ndc/workflow_state_replicator.go index 6024789698b..9d283599146 100644 --- a/service/history/ndc/workflow_state_replicator.go +++ b/service/history/ndc/workflow_state_replicator.go @@ -60,6 +60,7 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/historybuilder" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/workflow" wcache "go.temporal.io/server/service/history/workflow/cache" @@ -192,7 +193,7 @@ func (r *WorkflowStateReplicatorImpl) SyncWorkflowState( } // we don't care about activity state here as activity can't run after workflow is closed. - return engine.SyncHSM(ctx, &shard.SyncHSMRequest{ + return engine.SyncHSM(ctx, &historyi.SyncHSMRequest{ WorkflowKey: ms.GetWorkflowKey(), StateMachineNode: &persistencespb.StateMachineNode{ Children: executionInfo.SubStateMachinesByType, @@ -582,7 +583,7 @@ func (r *WorkflowStateReplicatorImpl) backFillEvents( } newRunID = newRunInfo.RunId } - return engine.BackfillHistoryEvents(ctx, &shard.BackfillHistoryEventsRequest{ + return engine.BackfillHistoryEvents(ctx, &historyi.BackfillHistoryEventsRequest{ WorkflowKey: definition.NewWorkflowKey(namespaceID.String(), workflowID, runID), SourceClusterName: sourceClusterName, VersionedHistory: destinationVersionedTransition, diff --git a/service/history/ndc/workflow_state_replicator_test.go b/service/history/ndc/workflow_state_replicator_test.go index 72ae60c1f98..ac325fbb5d7 100644 --- a/service/history/ndc/workflow_state_replicator_test.go +++ b/service/history/ndc/workflow_state_replicator_test.go @@ -55,6 +55,7 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.temporal.io/server/service/history/workflow" @@ -569,11 +570,11 @@ func (s *workflowReplicatorSuite) Test_ApplyWorkflowState_ExistWorkflow_SyncHSM( }) mockMutableState.EXPECT().GetWorkflowKey().Return(definition.NewWorkflowKey(namespaceID, s.workflowID, s.runID)).AnyTimes() - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.mockShard.SetEngineForTesting(engine) currentVersionHistory, err := versionhistory.GetCurrentVersionHistory(versionHistories) s.NoError(err) - engine.EXPECT().SyncHSM(gomock.Any(), &shard.SyncHSMRequest{ + engine.EXPECT().SyncHSM(gomock.Any(), &historyi.SyncHSMRequest{ WorkflowKey: definition.NewWorkflowKey(namespaceID, s.workflowID, s.runID), StateMachineNode: &persistencespb.StateMachineNode{ Children: request.WorkflowState.ExecutionInfo.SubStateMachinesByType, diff --git a/service/history/replication/eventhandler/event_importer.go b/service/history/replication/eventhandler/event_importer.go index 96a13554867..969cfb11b1e 100644 --- a/service/history/replication/eventhandler/event_importer.go +++ b/service/history/replication/eventhandler/event_importer.go @@ -40,7 +40,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/versionhistory" - "go.temporal.io/server/service/history/shard" + historyi "go.temporal.io/server/service/history/interfaces" ) const ( @@ -180,7 +180,7 @@ func (e *eventImporterImpl) ImportHistoryEventsFromBeginning( func invokeImportWorkflowExecutionCall( ctx context.Context, - historyEngine shard.Engine, + historyEngine historyi.Engine, workflowKey definition.WorkflowKey, historyBatches []*commonpb.DataBlob, versionHistory *historyspb.VersionHistory, diff --git a/service/history/replication/eventhandler/event_importer_test.go b/service/history/replication/eventhandler/event_importer_test.go index 56142a578b1..99b038b7140 100644 --- a/service/history/replication/eventhandler/event_importer_test.go +++ b/service/history/replication/eventhandler/event_importer_test.go @@ -43,7 +43,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence/serialization" - "go.temporal.io/server/service/history/shard" + historyi "go.temporal.io/server/service/history/interfaces" "go.uber.org/mock/gomock" ) @@ -57,7 +57,7 @@ type ( remoteHistoryFetcher *MockHistoryPaginatedFetcher engineProvider historyEngineProvider eventImporter EventImporter - engine *shard.MockEngine + engine *historyi.MockEngine } ) @@ -91,8 +91,8 @@ func (s *eventImporterSuite) SetupTest() { s.logger = log.NewNoopLogger() s.eventSerializer = serialization.NewSerializer() s.remoteHistoryFetcher = NewMockHistoryPaginatedFetcher(s.controller) - s.engine = shard.NewMockEngine(s.controller) - s.engineProvider = func(ctx context.Context, namespaceId namespace.ID, workflowId string) (shard.Engine, error) { + s.engine = historyi.NewMockEngine(s.controller) + s.engineProvider = func(ctx context.Context, namespaceId namespace.ID, workflowId string) (historyi.Engine, error) { return s.engine, nil } s.eventImporter = NewEventImporter( diff --git a/service/history/replication/eventhandler/history_events_handler_test.go b/service/history/replication/eventhandler/history_events_handler_test.go index 7e28dc2ad56..2cea3784abf 100644 --- a/service/history/replication/eventhandler/history_events_handler_test.go +++ b/service/history/replication/eventhandler/history_events_handler_test.go @@ -40,6 +40,7 @@ import ( "go.temporal.io/server/common/definition" "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.uber.org/mock/gomock" ) @@ -122,7 +123,7 @@ func (s *historyEventHandlerSuite) TestHandleHistoryEvents_RemoteOnly() { RunID: runId, } shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(namespaceId), workflowId, @@ -200,7 +201,7 @@ func (s *historyEventHandlerSuite) TestHandleHistoryEvents_LocalAndRemote_Handle RunID: runId, } shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(namespaceId), workflowId, @@ -263,7 +264,7 @@ func (s *historyEventHandlerSuite) TestHandleLocalHistoryEvents_AlreadyExist() { RunID: runId, } shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(namespaceId), workflowId, @@ -318,7 +319,7 @@ func (s *historyEventHandlerSuite) TestHandleHistoryEvents_LocalOnly_ImportAllLo } shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(namespaceId), workflowId, @@ -375,7 +376,7 @@ func (s *historyEventHandlerSuite) TestHandleHistoryEvents_LocalOnly_ExistButNot } shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(namespaceId), workflowId, diff --git a/service/history/replication/eventhandler/resend_handler.go b/service/history/replication/eventhandler/resend_handler.go index 051998c76e3..688c30798f5 100644 --- a/service/history/replication/eventhandler/resend_handler.go +++ b/service/history/replication/eventhandler/resend_handler.go @@ -44,11 +44,11 @@ import ( "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/service/history/configs" - "go.temporal.io/server/service/history/shard" + historyi "go.temporal.io/server/service/history/interfaces" ) type ( - historyEngineProvider func(ctx context.Context, namespaceId namespace.ID, workflowId string) (shard.Engine, error) + historyEngineProvider func(ctx context.Context, namespaceId namespace.ID, workflowId string) (historyi.Engine, error) ResendHandler interface { ResendHistoryEvents( ctx context.Context, @@ -80,7 +80,7 @@ func NewResendHandler( clientBean client.Bean, serializer serialization.Serializer, clusterMetadata cluster.Metadata, - historyEngineProvider func(ctx context.Context, namespaceId namespace.ID, workflowId string) (shard.Engine, error), + historyEngineProvider func(ctx context.Context, namespaceId namespace.ID, workflowId string) (historyi.Engine, error), remoteHistoryFetcher HistoryPaginatedFetcher, importer EventImporter, logger log.Logger, diff --git a/service/history/replication/eventhandler/resend_handler_test.go b/service/history/replication/eventhandler/resend_handler_test.go index 54013955c9e..126f157f148 100644 --- a/service/history/replication/eventhandler/resend_handler_test.go +++ b/service/history/replication/eventhandler/resend_handler_test.go @@ -49,7 +49,7 @@ import ( "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" - "go.temporal.io/server/service/history/shard" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" ) @@ -73,7 +73,7 @@ type ( logger log.Logger config *configs.Config resendHandler ResendHandler - engine *shard.MockEngine + engine *historyi.MockEngine historyFetcher *MockHistoryPaginatedFetcher importer *MockEventImporter } @@ -124,7 +124,7 @@ func (s *resendHandlerSuite) SetupTest() { ) s.mockNamespaceCache.EXPECT().GetNamespaceByID(s.namespaceID).Return(namespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespace(s.namespace).Return(namespaceEntry, nil).AnyTimes() - s.engine = shard.NewMockEngine(s.controller) + s.engine = historyi.NewMockEngine(s.controller) s.serializer = serialization.NewSerializer() s.importer = NewMockEventImporter(s.controller) s.resendHandler = NewResendHandler( @@ -132,7 +132,7 @@ func (s *resendHandlerSuite) SetupTest() { s.mockClientBean, s.serializer, s.mockClusterMetadata, - func(ctx context.Context, namespaceId namespace.ID, workflowId string) (shard.Engine, error) { + func(ctx context.Context, namespaceId namespace.ID, workflowId string) (historyi.Engine, error) { return s.engine, nil }, s.historyFetcher, diff --git a/service/history/replication/executable_activity_state_task_test.go b/service/history/replication/executable_activity_state_task_test.go index cd52b917826..49ae8027909 100644 --- a/service/history/replication/executable_activity_state_task_test.go +++ b/service/history/replication/executable_activity_state_task_test.go @@ -51,6 +51,7 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -169,7 +170,7 @@ func (s *executableActivityStateTaskSuite) TestExecute_Process() { ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, @@ -230,7 +231,7 @@ func (s *executableActivityStateTaskSuite) TestHandleErr_Resend_Success() { uuid.NewString(), true, nil, ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, @@ -391,7 +392,7 @@ func (s *executableActivityStateTaskSuite) TestBatchedTask_ShouldBatchTogether_A uuid.NewString(), true, nil, ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(namespaceId), workflowId, diff --git a/service/history/replication/executable_backfill_history_events_task.go b/service/history/replication/executable_backfill_history_events_task.go index 9ebae8c037c..887fe9af44f 100644 --- a/service/history/replication/executable_backfill_history_events_task.go +++ b/service/history/replication/executable_backfill_history_events_task.go @@ -38,7 +38,7 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" serviceerrors "go.temporal.io/server/common/serviceerror" ctasks "go.temporal.io/server/common/tasks" - "go.temporal.io/server/service/history/shard" + historyi "go.temporal.io/server/service/history/interfaces" ) type ( @@ -138,7 +138,7 @@ func (e *ExecutableBackfillHistoryEventsTask) Execute() error { return err } - return engine.BackfillHistoryEvents(ctx, &shard.BackfillHistoryEventsRequest{ + return engine.BackfillHistoryEvents(ctx, &historyi.BackfillHistoryEventsRequest{ WorkflowKey: e.WorkflowKey, SourceClusterName: e.SourceClusterName(), VersionedHistory: e.ReplicationTask().VersionedTransition, diff --git a/service/history/replication/executable_backfill_history_events_task_test.go b/service/history/replication/executable_backfill_history_events_task_test.go index 581314018b0..0cad3642f8b 100644 --- a/service/history/replication/executable_backfill_history_events_task_test.go +++ b/service/history/replication/executable_backfill_history_events_task_test.go @@ -48,6 +48,7 @@ import ( "go.temporal.io/server/common/persistence/serialization" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -202,14 +203,14 @@ func (s *executableBackfillHistoryEventsTaskSuite) TestExecute_Process() { ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, ).Return(shardContext, nil).AnyTimes() shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() - engine.EXPECT().BackfillHistoryEvents(gomock.Any(), &shard.BackfillHistoryEventsRequest{ + engine.EXPECT().BackfillHistoryEvents(gomock.Any(), &historyi.BackfillHistoryEventsRequest{ WorkflowKey: definition.WorkflowKey{ NamespaceID: s.task.NamespaceID, WorkflowID: s.task.WorkflowID, @@ -261,7 +262,7 @@ func (s *executableBackfillHistoryEventsTaskSuite) TestHandleErr_Resend_Success( uuid.NewString(), true, nil, ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, diff --git a/service/history/replication/executable_history_task_test.go b/service/history/replication/executable_history_task_test.go index 0f08675692a..5455c35540d 100644 --- a/service/history/replication/executable_history_task_test.go +++ b/service/history/replication/executable_history_task_test.go @@ -53,6 +53,7 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/common/xdc" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -222,7 +223,7 @@ func (s *executableHistoryTaskSuite) TestExecute_Process() { ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, @@ -275,7 +276,7 @@ func (s *executableHistoryTaskSuite) TestHandleErr_Resend_Success() { uuid.NewString(), true, nil, ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, diff --git a/service/history/replication/executable_sync_hsm_task.go b/service/history/replication/executable_sync_hsm_task.go index f9ad26dab95..9c58ba57964 100644 --- a/service/history/replication/executable_sync_hsm_task.go +++ b/service/history/replication/executable_sync_hsm_task.go @@ -37,7 +37,7 @@ import ( "go.temporal.io/server/common/namespace" serviceerrors "go.temporal.io/server/common/serviceerror" ctasks "go.temporal.io/server/common/tasks" - "go.temporal.io/server/service/history/shard" + historyi "go.temporal.io/server/service/history/interfaces" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -133,7 +133,7 @@ func (e *ExecutableSyncHSMTask) Execute() error { if err != nil { return err } - return engine.SyncHSM(ctx, &shard.SyncHSMRequest{ + return engine.SyncHSM(ctx, &historyi.SyncHSMRequest{ WorkflowKey: e.WorkflowKey, StateMachineNode: e.taskAttr.StateMachineNode, EventVersionHistory: e.taskAttr.VersionHistory, diff --git a/service/history/replication/executable_sync_hsm_task_test.go b/service/history/replication/executable_sync_hsm_task_test.go index 076f4584b5b..1353055ec89 100644 --- a/service/history/replication/executable_sync_hsm_task_test.go +++ b/service/history/replication/executable_sync_hsm_task_test.go @@ -45,6 +45,7 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -177,13 +178,13 @@ func (s *executableSyncHSMTaskSuite) TestExecute_Process() { ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, ).Return(shardContext, nil).AnyTimes() shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() - engine.EXPECT().SyncHSM(gomock.Any(), &shard.SyncHSMRequest{ + engine.EXPECT().SyncHSM(gomock.Any(), &historyi.SyncHSMRequest{ WorkflowKey: definition.WorkflowKey{ NamespaceID: s.task.NamespaceID, WorkflowID: s.task.WorkflowID, @@ -230,7 +231,7 @@ func (s *executableSyncHSMTaskSuite) TestHandleErr_Resend_Success() { uuid.NewString(), true, nil, ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, diff --git a/service/history/replication/executable_task.go b/service/history/replication/executable_task.go index a806585137d..5d2fca226e1 100644 --- a/service/history/replication/executable_task.go +++ b/service/history/replication/executable_task.go @@ -51,7 +51,7 @@ import ( "go.temporal.io/server/common/persistence/versionhistory" serviceerrors "go.temporal.io/server/common/serviceerror" ctasks "go.temporal.io/server/common/tasks" - "go.temporal.io/server/service/history/shard" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/tasks" ) @@ -541,7 +541,7 @@ func (e *ExecutableTaskImpl) BackFillEvents( } applyFn := func() error { - backFillRequest := &shard.BackfillHistoryEventsRequest{ + backFillRequest := &historyi.BackfillHistoryEventsRequest{ WorkflowKey: workflowKey, SourceClusterName: e.SourceClusterName(), VersionedHistory: e.ReplicationTask().VersionedTransition, diff --git a/service/history/replication/executable_task_test.go b/service/history/replication/executable_task_test.go index ab9b2bd86f0..8d64e0dd7bb 100644 --- a/service/history/replication/executable_task_test.go +++ b/service/history/replication/executable_task_test.go @@ -61,6 +61,7 @@ import ( "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/consts" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/replication/eventhandler" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" @@ -366,7 +367,7 @@ func (s *executableTaskSuite) TestResend_NotFound() { resendErr.EndEventVersion, ).Return(serviceerror.NewNotFound("")) shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(resendErr.NamespaceId), resendErr.WorkflowId, @@ -741,14 +742,14 @@ func (s *executableTaskSuite) TestBackFillEvents_Success() { endEventVersion, ).Return(fetcher) shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(workflowKey.NamespaceID), workflowKey.WorkflowID, ).Return(shardContext, nil).AnyTimes() shardContext.EXPECT().GetEngine(gomock.Any()).Return(engine, nil).AnyTimes() engine.EXPECT().BackfillHistoryEvents( - gomock.Any(), protomock.Eq(&shard.BackfillHistoryEventsRequest{ + gomock.Any(), protomock.Eq(&historyi.BackfillHistoryEventsRequest{ WorkflowKey: workflowKey, SourceClusterName: s.sourceCluster, VersionedHistory: s.task.replicationTask.VersionedTransition, @@ -756,7 +757,7 @@ func (s *executableTaskSuite) TestBackFillEvents_Success() { Events: [][]*historypb.HistoryEvent{eventBatchOriginal1}, })).Return(nil) engine.EXPECT().BackfillHistoryEvents( - gomock.Any(), protomock.Eq(&shard.BackfillHistoryEventsRequest{ + gomock.Any(), protomock.Eq(&historyi.BackfillHistoryEventsRequest{ WorkflowKey: workflowKey, SourceClusterName: s.sourceCluster, VersionedHistory: s.task.replicationTask.VersionedTransition, @@ -1165,7 +1166,7 @@ func (s *executableTaskSuite) TestSyncState() { }, nil).Times(1) shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(syncStateErr.NamespaceId), syncStateErr.WorkflowId, diff --git a/service/history/replication/executable_workflow_state_task_test.go b/service/history/replication/executable_workflow_state_task_test.go index 8cbe8099a22..4ae3239ba4b 100644 --- a/service/history/replication/executable_workflow_state_task_test.go +++ b/service/history/replication/executable_workflow_state_task_test.go @@ -46,6 +46,7 @@ import ( serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -159,7 +160,7 @@ func (s *executableWorkflowStateTaskSuite) TestExecute_Process() { ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, @@ -208,7 +209,7 @@ func (s *executableWorkflowStateTaskSuite) TestHandleErr_Resend_Success() { uuid.NewString(), true, nil, ).AnyTimes() shardContext := shard.NewMockContext(s.controller) - engine := shard.NewMockEngine(s.controller) + engine := historyi.NewMockEngine(s.controller) s.shardController.EXPECT().GetShardByNamespaceWorkflow( namespace.ID(s.task.NamespaceID), s.task.WorkflowID, diff --git a/service/history/replication/fx.go b/service/history/replication/fx.go index 8fefb8b4b4f..860a1072a96 100644 --- a/service/history/replication/fx.go +++ b/service/history/replication/fx.go @@ -43,6 +43,7 @@ import ( ctasks "go.temporal.io/server/common/tasks" "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/replication/eventhandler" "go.temporal.io/server/service/history/shard" @@ -111,7 +112,7 @@ func replicationTaskConverterFactoryProvider( config *configs.Config, ) SourceTaskConverterProvider { return func( - historyEngine shard.Engine, + historyEngine historyi.Engine, shardContext shard.Context, clientClusterName string, serializer serialization.Serializer, @@ -341,7 +342,7 @@ func resendHandlerProvider( clientBean, serializer, clusterMetadata, - func(ctx context.Context, namespaceId namespace.ID, workflowId string) (shard.Engine, error) { + func(ctx context.Context, namespaceId namespace.ID, workflowId string) (historyi.Engine, error) { shardContext, err := shardController.GetShardByNamespaceWorkflow( namespaceId, workflowId, @@ -366,7 +367,7 @@ func eventImporterProvider( ) eventhandler.EventImporter { return eventhandler.NewEventImporter( historyFetcher, - func(ctx context.Context, namespaceId namespace.ID, workflowId string) (shard.Engine, error) { + func(ctx context.Context, namespaceId namespace.ID, workflowId string) (historyi.Engine, error) { shardContext, err := shardController.GetShardByNamespaceWorkflow( namespaceId, workflowId, diff --git a/service/history/replication/raw_task_converter.go b/service/history/replication/raw_task_converter.go index 0fb0e3d7ef6..ffe52f11620 100644 --- a/service/history/replication/raw_task_converter.go +++ b/service/history/replication/raw_task_converter.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/persistence/versionhistory" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/workflow" @@ -55,7 +56,7 @@ import ( type ( SourceTaskConverterImpl struct { - historyEngine shard.Engine + historyEngine historyi.Engine namespaceCache namespace.Registry serializer serialization.Serializer config *configs.Config @@ -64,7 +65,7 @@ type ( Convert(task tasks.Task, targetClusterID int32) (*replicationspb.ReplicationTask, error) } SourceTaskConverterProvider func( - historyEngine shard.Engine, + historyEngine historyi.Engine, shardContext shard.Context, clientClusterName string, // Some task converter may use the client cluster name. serializer serialization.Serializer, @@ -83,7 +84,7 @@ type ( ) func NewSourceTaskConverter( - historyEngine shard.Engine, + historyEngine historyi.Engine, namespaceCache namespace.Registry, serializer serialization.Serializer, config *configs.Config, diff --git a/service/history/replication/raw_task_converter_test.go b/service/history/replication/raw_task_converter_test.go index eeb97eb3815..3593f736973 100644 --- a/service/history/replication/raw_task_converter_test.go +++ b/service/history/replication/raw_task_converter_test.go @@ -53,6 +53,7 @@ import ( "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/service/history/hsm" "go.temporal.io/server/service/history/hsm/hsmtest" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -71,7 +72,7 @@ type ( controller *gomock.Controller shardContext *shard.ContextTest workflowCache *wcache.MockCache - mockEngine *shard.MockEngine + mockEngine *historyi.MockEngine progressCache *MockProgressCache executionManager *persistence.MockExecutionManager syncStateRetriever *MockSyncStateRetriever @@ -148,7 +149,7 @@ func (s *rawTaskConverterSuite) SetupTest() { s.executionManager = s.shardContext.Resource.ExecutionMgr s.logger = s.shardContext.GetLogger() - s.mockEngine = shard.NewMockEngine(s.controller) + s.mockEngine = historyi.NewMockEngine(s.controller) s.mockEngine.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes() s.mockEngine.EXPECT().Stop().AnyTimes() s.shardContext.SetEngineForTesting(s.mockEngine) diff --git a/service/history/replication/stream_receiver_monitor_test.go b/service/history/replication/stream_receiver_monitor_test.go index 0f244ebd1d1..065733f764d 100644 --- a/service/history/replication/stream_receiver_monitor_test.go +++ b/service/history/replication/stream_receiver_monitor_test.go @@ -43,6 +43,7 @@ import ( "go.temporal.io/server/common/log" "go.temporal.io/server/common/metrics" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.uber.org/mock/gomock" @@ -498,8 +499,8 @@ func (s *streamReceiverMonitorSuite) TestGenerateStatusMap_Success() { inboundKeys[key3] = struct{}{} ctx1 := shard.NewMockContext(s.controller) ctx2 := shard.NewMockContext(s.controller) - engine1 := shard.NewMockEngine(s.controller) - engine2 := shard.NewMockEngine(s.controller) + engine1 := historyi.NewMockEngine(s.controller) + engine2 := historyi.NewMockEngine(s.controller) engine1.EXPECT().GetMaxReplicationTaskInfo().Return(int64(1000), time.Now()) engine2.EXPECT().GetMaxReplicationTaskInfo().Return(int64(2000), time.Now()) readerId1 := shard.ReplicationReaderIDFromClusterShardID(int64(key1.Client.ClusterID), key1.Client.ShardID) diff --git a/service/history/replication/stream_sender.go b/service/history/replication/stream_sender.go index 961b6c9d992..9caa02b92e1 100644 --- a/service/history/replication/stream_sender.go +++ b/service/history/replication/stream_sender.go @@ -49,6 +49,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/primitives/timestamp" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "google.golang.org/protobuf/types/known/timestamppb" @@ -65,7 +66,7 @@ type ( StreamSenderImpl struct { server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer shardContext shard.Context - historyEngine shard.Engine + historyEngine historyi.Engine taskConverter SourceTaskConverter metrics metrics.Handler logger log.Logger @@ -85,7 +86,7 @@ type ( func NewStreamSender( server historyservice.HistoryService_StreamWorkflowReplicationMessagesServer, shardContext shard.Context, - historyEngine shard.Engine, + historyEngine historyi.Engine, taskConverter SourceTaskConverter, clientClusterName string, clientClusterShardCount int32, diff --git a/service/history/replication/stream_sender_test.go b/service/history/replication/stream_sender_test.go index e70febf9650..3b811a45b8c 100644 --- a/service/history/replication/stream_sender_test.go +++ b/service/history/replication/stream_sender_test.go @@ -46,6 +46,7 @@ import ( "go.temporal.io/server/common/persistence" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -61,7 +62,7 @@ type ( controller *gomock.Controller server *historyservicemock.MockHistoryService_StreamWorkflowReplicationMessagesServer shardContext *shard.MockContext - historyEngine *shard.MockEngine + historyEngine *historyi.MockEngine taskConverter *MockSourceTaskConverter clientShardKey ClusterShardKey @@ -90,7 +91,7 @@ func (s *streamSenderSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.server = historyservicemock.NewMockHistoryService_StreamWorkflowReplicationMessagesServer(s.controller) s.shardContext = shard.NewMockContext(s.controller) - s.historyEngine = shard.NewMockEngine(s.controller) + s.historyEngine = historyi.NewMockEngine(s.controller) s.taskConverter = NewMockSourceTaskConverter(s.controller) s.config = tests.NewDynamicConfig() diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 5c5e5d8ddee..d8e2d96d9e4 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -53,6 +53,7 @@ import ( "go.temporal.io/server/common/quotas" serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -82,7 +83,7 @@ type ( sourceCluster string sourceShardID int32 shard shard.Context - historyEngine shard.Engine + historyEngine historyi.Engine historySerializer serialization.Serializer config *configs.Config metricsHandler metrics.Handler @@ -116,7 +117,7 @@ type ( func NewTaskProcessor( sourceShardID int32, shard shard.Context, - historyEngine shard.Engine, + historyEngine historyi.Engine, config *configs.Config, metricsHandler metrics.Handler, replicationTaskFetcher taskFetcher, diff --git a/service/history/replication/task_processor_manager.go b/service/history/replication/task_processor_manager.go index 07c01ed0d06..cbacd688769 100644 --- a/service/history/replication/task_processor_manager.go +++ b/service/history/replication/task_processor_manager.go @@ -48,6 +48,7 @@ import ( "go.temporal.io/server/common/xdc" "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/deletemanager" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" wcache "go.temporal.io/server/service/history/workflow/cache" @@ -62,7 +63,7 @@ type ( taskProcessorManagerImpl struct { config *configs.Config deleteMgr deletemanager.DeleteManager - engine shard.Engine + engine historyi.Engine eventSerializer serialization.Serializer shard shard.Context status int32 @@ -86,7 +87,7 @@ type ( func NewTaskProcessorManager( config *configs.Config, shard shard.Context, - engine shard.Engine, + engine historyi.Engine, workflowCache wcache.Cache, workflowDeleteManager deletemanager.DeleteManager, clientBean client.Bean, diff --git a/service/history/replication/task_processor_manager_test.go b/service/history/replication/task_processor_manager_test.go index 94ceaebd64c..264cb7982b9 100644 --- a/service/history/replication/task_processor_manager_test.go +++ b/service/history/replication/task_processor_manager_test.go @@ -43,6 +43,7 @@ import ( "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -56,7 +57,7 @@ type ( controller *gomock.Controller mockShard *shard.MockContext - mockEngine *shard.MockEngine + mockEngine *historyi.MockEngine mockClientBean *client.MockBean mockClusterMetadata *cluster.MockMetadata mockHistoryClient *historyservicemock.MockHistoryServiceClient @@ -95,7 +96,7 @@ func (s *taskProcessorManagerSuite) SetupTest() { s.shardID = rand.Int31() s.shardOwner = "test-shard-owner" s.mockShard = shard.NewMockContext(s.controller) - s.mockEngine = shard.NewMockEngine(s.controller) + s.mockEngine = historyi.NewMockEngine(s.controller) s.mockClientBean = client.NewMockBean(s.controller) s.mockReplicationTaskExecutor = NewMockTaskExecutor(s.controller) diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index 0def51cb451..73350663cc9 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -55,6 +55,7 @@ import ( "go.temporal.io/server/common/resourcetest" "go.temporal.io/server/common/testing/protorequire" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -70,7 +71,7 @@ type ( controller *gomock.Controller mockResource *resourcetest.Test mockShard *shard.ContextTest - mockEngine *shard.MockEngine + mockEngine *historyi.MockEngine mockNamespaceCache *namespace.MockRegistry mockClientBean *client.MockBean mockAdminClient *adminservicemock.MockAdminServiceClient @@ -117,7 +118,7 @@ func (s *taskProcessorSuite) SetupTest() { }, s.config, ) - s.mockEngine = shard.NewMockEngine(s.controller) + s.mockEngine = historyi.NewMockEngine(s.controller) s.mockResource = s.mockShard.Resource s.mockNamespaceCache = s.mockResource.NamespaceCache s.mockClientBean = s.mockResource.ClientBean diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 27992d834c6..e100548d9c9 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -48,6 +48,7 @@ import ( "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/tasks" ) @@ -77,7 +78,7 @@ type ( GetSearchAttributesMapperProvider() searchattribute.MapperProvider GetArchivalMetadata() archiver.ArchivalMetadata - GetEngine(ctx context.Context) (Engine, error) + GetEngine(ctx context.Context) (historyi.Engine, error) AssertOwnership(ctx context.Context) error NewVectorClock() (*clockspb.VectorClock, error) diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index f04d2df32e2..31f430de30b 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -74,6 +74,7 @@ import ( "go.temporal.io/server/service/history/consts" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/vclock" "google.golang.org/protobuf/types/known/timestamppb" @@ -116,7 +117,7 @@ type ( contextTaggedLogger log.Logger throttledLogger log.Logger engineFactory EngineFactory - engineFuture *future.FutureImpl[Engine] + engineFuture *future.FutureImpl[historyi.Engine] queueMetricEmitter sync.Once finalizer *finalizer.Finalizer @@ -186,7 +187,7 @@ type ( contextRequest interface{} contextRequestAcquire struct{} - contextRequestAcquired struct{ engine Engine } + contextRequestAcquired struct{ engine historyi.Engine } contextRequestLost struct{} contextRequestStop struct{ reason stopReason } contextRequestFinishStop struct{} @@ -277,7 +278,7 @@ func (s *ContextImpl) GetPingChecks() []pingable.Check { func (s *ContextImpl) GetEngine( ctx context.Context, -) (Engine, error) { +) (historyi.Engine, error) { return s.engineFuture.Get(ctx) } @@ -1460,7 +1461,7 @@ func (s *ContextImpl) maybeRecordShardAcquisitionLatency(ownershipChanged bool) } } -func (s *ContextImpl) createEngine() Engine { +func (s *ContextImpl) createEngine() historyi.Engine { s.contextTaggedLogger.Info("", tag.LifeCycleStarting, tag.ComponentShardEngine) engine := s.engineFactory.CreateEngine(s) engine.Start() @@ -1986,7 +1987,7 @@ func (s *ContextImpl) acquireShard() { s.contextTaggedLogger.Info("Acquired shard") // The first time we get the shard, we have to create the engine - var engine Engine + var engine historyi.Engine if !s.engineFuture.Ready() { s.maybeRecordShardAcquisitionLatency(ownershipChanged) engine = s.createEngine() @@ -2120,7 +2121,7 @@ func newContext( handoverNamespaces: make(map[namespace.Name]*namespaceHandOverInfo), lifecycleCtx: lifecycleCtx, lifecycleCancel: lifecycleCancel, - engineFuture: future.NewFuture[Engine](), + engineFuture: future.NewFuture[historyi.Engine](), queueMetricEmitter: sync.Once{}, ioSemaphore: locks.NewPrioritySemaphore(ioConcurrency), stateMachineRegistry: stateMachineRegistry, diff --git a/service/history/shard/context_mock.go b/service/history/shard/context_mock.go index 5f69f9d50f8..fccffe6c287 100644 --- a/service/history/shard/context_mock.go +++ b/service/history/shard/context_mock.go @@ -58,6 +58,7 @@ import ( configs "go.temporal.io/server/service/history/configs" events "go.temporal.io/server/service/history/events" hsm "go.temporal.io/server/service/history/hsm" + interfaces "go.temporal.io/server/service/history/interfaces" tasks "go.temporal.io/server/service/history/tasks" gomock "go.uber.org/mock/gomock" ) @@ -302,10 +303,10 @@ func (mr *MockContextMockRecorder) GetCurrentTime(cluster any) *gomock.Call { } // GetEngine mocks base method. -func (m *MockContext) GetEngine(ctx context.Context) (Engine, error) { +func (m *MockContext) GetEngine(ctx context.Context) (interfaces.Engine, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetEngine", ctx) - ret0, _ := ret[0].(Engine) + ret0, _ := ret[0].(interfaces.Engine) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -1030,10 +1031,10 @@ func (mr *MockControllableContextMockRecorder) GetCurrentTime(cluster any) *gomo } // GetEngine mocks base method. -func (m *MockControllableContext) GetEngine(ctx context.Context) (Engine, error) { +func (m *MockControllableContext) GetEngine(ctx context.Context) (interfaces.Engine, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetEngine", ctx) - ret0, _ := ret[0].(Engine) + ret0, _ := ret[0].(interfaces.Engine) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/service/history/shard/context_test.go b/service/history/shard/context_test.go index a99015b9c63..202706cc9a2 100644 --- a/service/history/shard/context_test.go +++ b/service/history/shard/context_test.go @@ -44,6 +44,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives/timestamp" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -62,7 +63,7 @@ type ( mockShardManager *persistence.MockShardManager mockExecutionManager *persistence.MockExecutionManager mockNamespaceCache *namespace.MockRegistry - mockHistoryEngine *MockEngine + mockHistoryEngine *historyi.MockEngine timeSource *clock.EventTimeSource } @@ -103,7 +104,7 @@ func (s *contextSuite) SetupTest() { s.mockExecutionManager = shardContext.Resource.ExecutionMgr s.mockShardManager = shardContext.Resource.ShardMgr - s.mockHistoryEngine = NewMockEngine(s.controller) + s.mockHistoryEngine = historyi.NewMockEngine(s.controller) shardContext.engineFuture.Set(s.mockHistoryEngine, nil) } diff --git a/service/history/shard/context_testutil.go b/service/history/shard/context_testutil.go index 0f9b6e406d6..7d27a564984 100644 --- a/service/history/shard/context_testutil.go +++ b/service/history/shard/context_testutil.go @@ -44,6 +44,7 @@ import ( "go.temporal.io/server/service/history/configs" "go.temporal.io/server/service/history/events" "go.temporal.io/server/service/history/hsm" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/tasks" "go.uber.org/mock/gomock" ) @@ -103,13 +104,13 @@ type ContextConfigOverrides struct { type StubContext struct { ContextTest - engine Engine + engine historyi.Engine } func NewStubContext( ctrl *gomock.Controller, overrides ContextConfigOverrides, - engine Engine, + engine historyi.Engine, ) *StubContext { resourceTest := resourcetest.NewTest(ctrl, primitives.HistoryService) eventsCache := events.NewMockCache(ctrl) @@ -162,7 +163,7 @@ func newTestContext(t *resourcetest.Test, eventsCache events.Cache, config Conte queueMetricEmitter: sync.Once{}, state: contextStateAcquired, - engineFuture: future.NewFuture[Engine](), + engineFuture: future.NewFuture[historyi.Engine](), shardInfo: config.ShardInfo, remoteClusterInfos: make(map[string]*remoteClusterInfo), handoverNamespaces: make(map[namespace.Name]*namespaceHandOverInfo), @@ -196,7 +197,7 @@ func newTestContext(t *resourcetest.Test, eventsCache events.Cache, config Conte } // SetEngineForTest sets s.engine. Only used by tests. -func (s *ContextTest) SetEngineForTesting(engine Engine) { +func (s *ContextTest) SetEngineForTesting(engine historyi.Engine) { s.engineFuture.Set(engine, nil) } @@ -234,6 +235,6 @@ func (s *ContextTest) StopForTest() { s.FinishStop() } -func (s *StubContext) GetEngine(_ context.Context) (Engine, error) { +func (s *StubContext) GetEngine(_ context.Context) (historyi.Engine, error) { return s.engine, nil } diff --git a/service/history/shard/controller_test.go b/service/history/shard/controller_test.go index ff10662928e..b5d77fb0432 100644 --- a/service/history/shard/controller_test.go +++ b/service/history/shard/controller_test.go @@ -55,6 +55,7 @@ import ( "go.temporal.io/server/common/resourcetest" "go.temporal.io/server/internal/goro" "go.temporal.io/server/service/history/configs" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -68,7 +69,7 @@ type ( controller *gomock.Controller mockResource *resourcetest.Test - mockHistoryEngine *MockEngine + mockHistoryEngine *historyi.MockEngine mockClusterMetadata *cluster.MockMetadata mockServiceResolver *membership.MockServiceResolver @@ -133,7 +134,7 @@ func (s *controllerSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.mockResource = resourcetest.NewTest(s.controller, primitives.HistoryService) - s.mockHistoryEngine = NewMockEngine(s.controller) + s.mockHistoryEngine = historyi.NewMockEngine(s.controller) s.mockEngineFactory = NewMockEngineFactory(s.controller) s.mockShardManager = s.mockResource.ShardMgr @@ -171,12 +172,12 @@ func (s *controllerSuite) TestAcquireShardSuccess() { s.config.NumberOfShards = numShards var myShards []int32 - historyEngines := make(map[int32]*MockEngine) + historyEngines := make(map[int32]*historyi.MockEngine) for shardID := int32(1); shardID <= numShards; shardID++ { hostID := shardID % 4 if hostID == 0 { myShards = append(myShards, shardID) - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) historyEngines[shardID] = mockEngine s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6, true) } else { @@ -207,12 +208,12 @@ func (s *controllerSuite) TestAcquireShardsConcurrently() { } var myShards []int32 - historyEngines := make(map[int32]*MockEngine) + historyEngines := make(map[int32]*historyi.MockEngine) for shardID := int32(1); shardID <= numShards; shardID++ { hostID := shardID % 4 if hostID == 0 { myShards = append(myShards, shardID) - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) historyEngines[shardID] = mockEngine s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6, true) } else { @@ -257,9 +258,9 @@ func (s *controllerSuite) TestAcquireShardRenewSuccess() { numShards := int32(2) s.config.NumberOfShards = numShards - historyEngines := make(map[int32]*MockEngine) + historyEngines := make(map[int32]*historyi.MockEngine) for shardID := int32(1); shardID <= numShards; shardID++ { - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) historyEngines[shardID] = mockEngine s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6, true) } @@ -287,9 +288,9 @@ func (s *controllerSuite) TestAcquireShardRenewLookupFailed() { numShards := int32(2) s.config.NumberOfShards = numShards - historyEngines := make(map[int32]*MockEngine) + historyEngines := make(map[int32]*historyi.MockEngine) for shardID := int32(1); shardID <= numShards; shardID++ { - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) historyEngines[shardID] = mockEngine s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6, true) } @@ -323,9 +324,9 @@ func (s *controllerSuite) TestHistoryEngineClosed() { s.mockHostInfoProvider, s.metricsTestHandler, ) - historyEngines := make(map[int32]*MockEngine) + historyEngines := make(map[int32]*historyi.MockEngine) for shardID := int32(1); shardID <= numShards; shardID++ { - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) historyEngines[shardID] = mockEngine s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6, true) } @@ -426,9 +427,9 @@ func (s *controllerSuite) TestShardControllerClosed() { s.metricsTestHandler, ) - historyEngines := make(map[int32]*MockEngine) + historyEngines := make(map[int32]*historyi.MockEngine) for shardID := int32(1); shardID <= numShards; shardID++ { - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) historyEngines[shardID] = mockEngine s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6, true) } @@ -471,7 +472,7 @@ func (s *controllerSuite) TestShardControllerClosed() { func (s *controllerSuite) TestShardExplicitUnload() { s.config.NumberOfShards = 1 - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) mockEngine.EXPECT().Stop().AnyTimes() s.setupMocksForAcquireShard(1, mockEngine, 5, 6, false) @@ -492,7 +493,7 @@ func (s *controllerSuite) TestShardExplicitUnload() { func (s *controllerSuite) TestShardExplicitUnloadCancelGetOrCreate() { s.config.NumberOfShards = 1 - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) mockEngine.EXPECT().Stop().AnyTimes() shardID := int32(1) @@ -533,7 +534,7 @@ func (s *controllerSuite) TestShardExplicitUnloadCancelGetOrCreate() { func (s *controllerSuite) TestShardExplicitUnloadCancelAcquire() { s.config.NumberOfShards = 1 - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) mockEngine.EXPECT().Stop().AnyTimes() shardID := int32(1) @@ -601,8 +602,8 @@ func (s *controllerSuite) TestShardControllerFuzz() { queueStates := s.queueStates() s.mockServiceResolver.EXPECT().Lookup(convert.Int32ToString(shardID)).Return(s.hostInfo, nil).AnyTimes() - s.mockEngineFactory.EXPECT().CreateEngine(contextMatcher(shardID)).DoAndReturn(func(shard Context) Engine { - mockEngine := NewMockEngine(disconnectedMockController) + s.mockEngineFactory.EXPECT().CreateEngine(contextMatcher(shardID)).DoAndReturn(func(shard Context) historyi.Engine { + mockEngine := historyi.NewMockEngine(disconnectedMockController) status := new(int32) // notification step is done after engine is created, so may not be called when test finishes mockEngine.EXPECT().NotifyNewTasks(gomock.Any()).MaxTimes(2) @@ -732,8 +733,8 @@ func (s *controllerSuite) TestShardLingerTimeout() { timeLimit := 1 * time.Second s.config.ShardLingerTimeLimit = dynamicconfig.GetDurationPropertyFn(timeLimit) - historyEngines := make(map[int32]*MockEngine) - mockEngine := NewMockEngine(s.controller) + historyEngines := make(map[int32]*historyi.MockEngine) + mockEngine := historyi.NewMockEngine(s.controller) historyEngines[shardID] = mockEngine s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6, true) s.mockShardManager.EXPECT().AssertShardOwnership(gomock.Any(), &persistence.AssertShardOwnershipRequest{ @@ -779,8 +780,8 @@ func (s *controllerSuite) TestShardLingerSuccess() { checkQPS := 5 s.config.ShardLingerOwnershipCheckQPS = dynamicconfig.GetIntPropertyFn(checkQPS) - historyEngines := make(map[int32]*MockEngine) - mockEngine := NewMockEngine(s.controller) + historyEngines := make(map[int32]*historyi.MockEngine) + mockEngine := historyi.NewMockEngine(s.controller) historyEngines[shardID] = mockEngine mockEngine.EXPECT().Start().MinTimes(1) @@ -879,7 +880,7 @@ func (s *controllerSuite) TestShardCounter() { // safe to call this multiple times throughout a test. func (s *controllerSuite) setupAndAcquireShards(numShards int) { s.config.NumberOfShards = int32(numShards) - mockEngine := NewMockEngine(s.controller) + mockEngine := historyi.NewMockEngine(s.controller) for shardID := 1; shardID <= numShards; shardID++ { s.setupMocksForAcquireShard(int32(shardID), mockEngine, 5, 6, false) } @@ -888,7 +889,7 @@ func (s *controllerSuite) setupAndAcquireShards(numShards int) { func (s *controllerSuite) setupMocksForAcquireShard( shardID int32, - mockEngine *MockEngine, + mockEngine *historyi.MockEngine, currentRangeID, newRangeID int64, required bool, ) { diff --git a/service/history/shard/engine_factory.go b/service/history/shard/engine_factory.go index 249d5850b56..ab9e6e79627 100644 --- a/service/history/shard/engine_factory.go +++ b/service/history/shard/engine_factory.go @@ -26,9 +26,11 @@ package shard +import historyi "go.temporal.io/server/service/history/interfaces" + type ( // EngineFactory is used to create an instance of sharded history engine EngineFactory interface { - CreateEngine(context Context) Engine + CreateEngine(context Context) historyi.Engine } ) diff --git a/service/history/shard/engine_factory_mock.go b/service/history/shard/engine_factory_mock.go index 84d4ae9fa7c..3d9cf1b10c5 100644 --- a/service/history/shard/engine_factory_mock.go +++ b/service/history/shard/engine_factory_mock.go @@ -36,6 +36,7 @@ package shard import ( reflect "reflect" + interfaces "go.temporal.io/server/service/history/interfaces" gomock "go.uber.org/mock/gomock" ) @@ -63,10 +64,10 @@ func (m *MockEngineFactory) EXPECT() *MockEngineFactoryMockRecorder { } // CreateEngine mocks base method. -func (m *MockEngineFactory) CreateEngine(context Context) Engine { +func (m *MockEngineFactory) CreateEngine(context Context) interfaces.Engine { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateEngine", context) - ret0, _ := ret[0].(Engine) + ret0, _ := ret[0].(interfaces.Engine) return ret0 } diff --git a/service/history/workflow/context_test.go b/service/history/workflow/context_test.go index a709cdbe4b6..1a04c17d97e 100644 --- a/service/history/workflow/context_test.go +++ b/service/history/workflow/context_test.go @@ -44,6 +44,7 @@ import ( "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/primitives/timestamp" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" "go.temporal.io/server/service/history/tests" @@ -77,7 +78,7 @@ func (s *contextSuite) SetupTest() { &persistencespb.ShardInfo{ShardId: 1}, configs, ) - mockEngine := shard.NewMockEngine(controller) + mockEngine := historyi.NewMockEngine(controller) mockEngine.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes() mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes() s.mockShard.SetEngineForTesting(mockEngine) diff --git a/service/history/workflow/transaction_impl.go b/service/history/workflow/transaction_impl.go index a9399317c1f..90b01a2ab9f 100644 --- a/service/history/workflow/transaction_impl.go +++ b/service/history/workflow/transaction_impl.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/service/history/events" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" ) @@ -566,7 +567,7 @@ func setWorkflowExecution( } func NotifyWorkflowSnapshotTasks( - engine shard.Engine, + engine historyi.Engine, workflowSnapshot *persistence.WorkflowSnapshot, ) { if workflowSnapshot == nil { @@ -576,7 +577,7 @@ func NotifyWorkflowSnapshotTasks( } func NotifyWorkflowMutationTasks( - engine shard.Engine, + engine historyi.Engine, workflowMutation *persistence.WorkflowMutation, ) { if workflowMutation == nil { @@ -586,7 +587,7 @@ func NotifyWorkflowMutationTasks( } func NotifyNewHistorySnapshotEvent( - engine shard.Engine, + engine historyi.Engine, workflowSnapshot *persistence.WorkflowSnapshot, ) error { @@ -625,7 +626,7 @@ func NotifyNewHistorySnapshotEvent( } func NotifyNewHistoryMutationEvent( - engine shard.Engine, + engine historyi.Engine, workflowMutation *persistence.WorkflowMutation, ) error { diff --git a/service/history/workflow/transaction_test.go b/service/history/workflow/transaction_test.go index 3038776b565..a54ef09617a 100644 --- a/service/history/workflow/transaction_test.go +++ b/service/history/workflow/transaction_test.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/persistence" "go.temporal.io/server/common/util" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" @@ -49,7 +50,7 @@ type ( controller *gomock.Controller mockShard *shard.MockContext - mockEngine *shard.MockEngine + mockEngine *historyi.MockEngine mockNamespaceCache *namespace.MockRegistry logger log.Logger @@ -68,7 +69,7 @@ func (s *transactionSuite) SetupTest() { s.controller = gomock.NewController(s.T()) s.mockShard = shard.NewMockContext(s.controller) - s.mockEngine = shard.NewMockEngine(s.controller) + s.mockEngine = historyi.NewMockEngine(s.controller) s.mockNamespaceCache = namespace.NewMockRegistry(s.controller) s.logger = log.NewTestLogger() diff --git a/tests/add_tasks_test.go b/tests/add_tasks_test.go index b0d3c793f20..7e677c62cee 100644 --- a/tests/add_tasks_test.go +++ b/tests/add_tasks_test.go @@ -44,6 +44,7 @@ import ( "go.temporal.io/server/common/debug" "go.temporal.io/server/common/persistence/serialization" "go.temporal.io/server/common/primitives" + historyi "go.temporal.io/server/service/history/interfaces" "go.temporal.io/server/service/history/queues" "go.temporal.io/server/service/history/shard" "go.temporal.io/server/service/history/tasks" @@ -103,7 +104,7 @@ func (c *faultyShardController) GetShardByID(shardID int32) (shard.Context, erro return &faultyShardContext{Context: ctx, suite: c.s}, nil } -func (c *faultyShardContext) GetEngine(ctx context.Context) (shard.Engine, error) { +func (c *faultyShardContext) GetEngine(ctx context.Context) (historyi.Engine, error) { err := c.suite.getEngineErr.Load() if err != nil && *err != nil { return nil, *err