From b1fef0a3b268aca1524e369a2db484cb0837fd7f Mon Sep 17 00:00:00 2001 From: jbowers Date: Mon, 13 Jan 2025 22:56:51 -0600 Subject: [PATCH] IWF-274: fix prune split logic and tests to pass timer config through to interpreter --- integ/timer_test.go | 39 ++++++++ integ/util.go | 13 +++ service/api/service.go | 3 + .../timers/greedyTimerProcessor.go | 93 ++++++++++--------- service/interpreter/timers/timerScheduler.go | 1 + 5 files changed, 107 insertions(+), 42 deletions(-) create mode 100644 service/interpreter/timers/timerScheduler.go diff --git a/integ/timer_test.go b/integ/timer_test.go index bb1521c4..198cba7e 100644 --- a/integ/timer_test.go +++ b/integ/timer_test.go @@ -57,6 +57,45 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) { } // TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true +func TestGreedyTimerWorkflowTemporal(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumGreedyTimerConfig(true, false)) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowCadence(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeCadence, minimumGreedyTimerConfig(true, false)) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowTemporalContinueAsNew(t *testing.T) { + if !*temporalIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0()) + smallWaitForFastTest() + } +} + +func TestGreedyTimerWorkflowCadenceContinueAsNew(t *testing.T) { + if !*cadenceIntegTest { + t.Skip() + } + for i := 0; i < *repeatIntegTest; i++ { + doTestTimerWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0()) + smallWaitForFastTest() + } +} func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) { // start test workflow server diff --git a/integ/util.go b/integ/util.go index 6fdef79e..f6d410e5 100644 --- a/integ/util.go +++ b/integ/util.go @@ -220,6 +220,19 @@ func minimumContinueAsNewConfig(optimizeActivity bool) *iwfidl.WorkflowConfig { } } +func minimumGreedyTimerConfig(optimizeTimer bool, continueAsNew bool) *iwfidl.WorkflowConfig { + if continueAsNew { + return &iwfidl.WorkflowConfig{ + ContinueAsNewThreshold: iwfidl.PtrInt32(1), + OptimizeTimer: iwfidl.PtrBool(optimizeTimer), + } + } + + return &iwfidl.WorkflowConfig{ + OptimizeTimer: iwfidl.PtrBool(optimizeTimer), + } +} + func minimumContinueAsNewConfigV0() *iwfidl.WorkflowConfig { return minimumContinueAsNewConfig(false) } diff --git a/service/api/service.go b/service/api/service.go index 2be54a4f..c48739d5 100644 --- a/service/api/service.go +++ b/service/api/service.go @@ -196,6 +196,9 @@ func overrideWorkflowConfig(configOverride iwfidl.WorkflowConfig, workflowConfig if configOverride.OptimizeActivity != nil { workflowConfig.OptimizeActivity = configOverride.OptimizeActivity } + if configOverride.OptimizeTimer != nil { + workflowConfig.OptimizeTimer = configOverride.OptimizeTimer + } } func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion( diff --git a/service/interpreter/timers/greedyTimerProcessor.go b/service/interpreter/timers/greedyTimerProcessor.go index 6883dd51..6cd8d8da 100644 --- a/service/interpreter/timers/greedyTimerProcessor.go +++ b/service/interpreter/timers/greedyTimerProcessor.go @@ -9,14 +9,15 @@ import ( "github.com/indeedeng/iwf/service" ) -type sortedTimers struct { - status service.InternalTimerStatus - // Ordered slice of all timers being awaited on - timers []*service.TimerInfo +type TimerManager struct { + // Does not map to the timers actually created by the workflow provider + PendingScheduling []*service.TimerInfo + // timers created through the workflow provider that are going to fire + ScheduledTimerTimes []int64 } type GreedyTimerProcessor struct { - pendingTimers sortedTimers + timerManger TimerManager stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo staleSkipTimerSignals []service.StaleSkipTimerSignal provider interfaces.WorkflowProvider @@ -32,13 +33,13 @@ func NewGreedyTimerProcessor( tp := &GreedyTimerProcessor{ provider: provider, - pendingTimers: sortedTimers{status: service.TimerPending}, + timerManger: TimerManager{}, stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{}, logger: provider.GetLogger(ctx), staleSkipTimerSignals: staleSkipTimerSignals, } - // start some single thread that manages timers + // start some single thread that manages PendingScheduling tp.createGreedyTimerScheduler(ctx, continueAsNewCounter) err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) { @@ -52,14 +53,14 @@ func NewGreedyTimerProcessor( return tp } -func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) { +func (t *TimerManager) addTimer(toAdd *service.TimerInfo) { - if toAdd == nil || toAdd.Status != t.status { + if toAdd == nil || toAdd.Status != service.TimerPending { panic("invalid timer added") } insertIndex := 0 - for i, timer := range t.timers { + for i, timer := range t.PendingScheduling { if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds { // don't want dupes. Makes remove simpler if toAdd == timer { @@ -70,37 +71,45 @@ func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) { } insertIndex = i + 1 } - t.timers = append( - t.timers[:insertIndex], - append([]*service.TimerInfo{toAdd}, t.timers[insertIndex:]...)...) + t.PendingScheduling = append( + t.PendingScheduling[:insertIndex], + append([]*service.TimerInfo{toAdd}, t.PendingScheduling[insertIndex:]...)...) } -func (t *sortedTimers) removeTimer(toRemove *service.TimerInfo) { - for i, timer := range t.timers { +func (t *TimerManager) removeTimer(toRemove *service.TimerInfo) { + for i, timer := range t.PendingScheduling { if toRemove == timer { - t.timers = append(t.timers[:i], t.timers[i+1:]...) + t.PendingScheduling = append(t.PendingScheduling[:i], t.PendingScheduling[i+1:]...) return } } } -func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo { +func (t *TimerManager) pruneToNextTimer(pruneTo int64) *service.TimerInfo { - if len(t.timers) == 0 { + if len(t.PendingScheduling) == 0 { return nil } - index := len(t.timers) + index := len(t.PendingScheduling) - for i := len(t.timers) - 1; i >= 0; i-- { - timer := t.timers[i] - if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == t.status { + for i := len(t.PendingScheduling) - 1; i >= 0; i-- { + timer := t.PendingScheduling[i] + if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == service.TimerPending { break } index = i } - t.timers = t.timers[:index] - return t.timers[index-1] + + // If index is 0, it means all timers are pruned + if index == 0 { + t.PendingScheduling = nil + return nil + } + + prunedTimer := t.PendingScheduling[index-1] + t.PendingScheduling = t.PendingScheduling[:index] + return prunedTimer } func (t *GreedyTimerProcessor) createGreedyTimerScheduler( @@ -108,20 +117,18 @@ func (t *GreedyTimerProcessor) createGreedyTimerScheduler( continueAsNewCounter *cont.ContinueAsNewCounter) { t.provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) { - // NOTE: next timer to fire is at the end of the slice - var createdTimers []int64 for { - t.provider.Await(ctx, func() bool { - // remove fired timers + _ = t.provider.Await(ctx, func() bool { + // remove fired PendingScheduling now := t.provider.Now(ctx).Unix() - for i := len(createdTimers) - 1; i >= 0; i-- { - if createdTimers[i] > now { - createdTimers = createdTimers[:i+1] + for i := len(t.timerManger.ScheduledTimerTimes) - 1; i >= 0; i-- { + if t.timerManger.ScheduledTimerTimes[i] > now { + t.timerManger.ScheduledTimerTimes = t.timerManger.ScheduledTimerTimes[:i+1] break } } - next := t.pendingTimers.pruneToNextTimer(now) - return (next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])) || continueAsNewCounter.IsThresholdMet() + next := t.timerManger.pruneToNextTimer(now) + return (next != nil && (len(t.timerManger.ScheduledTimerTimes) == 0 || next.FiringUnixTimestampSeconds < t.timerManger.ScheduledTimerTimes[len(t.timerManger.ScheduledTimerTimes)-1])) || continueAsNewCounter.IsThresholdMet() }) if continueAsNewCounter.IsThresholdMet() { @@ -129,14 +136,14 @@ func (t *GreedyTimerProcessor) createGreedyTimerScheduler( } now := t.provider.Now(ctx).Unix() - next := t.pendingTimers.pruneToNextTimer(now) - //next := t.pendingTimers.getEarliestTimer() + next := t.timerManger.pruneToNextTimer(now) + //next := t.timerManger.getEarliestTimer() // only create a new timer when a pending timer exists before the next existing timer fires - if next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) { + if next != nil && (len(t.timerManger.ScheduledTimerTimes) == 0 || next.FiringUnixTimestampSeconds < t.timerManger.ScheduledTimerTimes[len(t.timerManger.ScheduledTimerTimes)-1]) { fireAt := next.FiringUnixTimestampSeconds duration := time.Duration(fireAt-now) * time.Second t.provider.NewTimer(ctx, duration) - createdTimers = append(createdTimers, fireAt) + t.timerManger.ScheduledTimerTimes = append(t.timerManger.ScheduledTimerTimes, fireAt) } } }) @@ -216,23 +223,23 @@ func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped( return service.TimerSkipped } - if timer.FiringUnixTimestampSeconds >= t.provider.Now(ctx).Unix() { + if timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix() { timer.Status = service.TimerFired return service.TimerFired } // otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped) - t.pendingTimers.removeTimer(timer) + t.timerManger.removeTimer(timer) return service.TimerPending } -// RemovePendingTimersOfState is for when a state is completed, remove all its pending timers +// RemovePendingTimersOfState is for when a state is completed, remove all its pending PendingScheduling func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) { timers := t.stateExecutionCurrentTimerInfos[stateExeId] for _, timer := range timers { - t.pendingTimers.removeTimer(timer) + t.timerManger.removeTimer(timer) } delete(t.stateExecutionCurrentTimerInfos, stateExeId) @@ -256,7 +263,9 @@ func (t *GreedyTimerProcessor) AddTimers( FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(), Status: service.TimerPending, } - t.pendingTimers.addTimer(&timer) + } + if timer.Status == service.TimerPending { + t.timerManger.addTimer(&timer) } timers[idx] = &timer } diff --git a/service/interpreter/timers/timerScheduler.go b/service/interpreter/timers/timerScheduler.go new file mode 100644 index 00000000..4eba3151 --- /dev/null +++ b/service/interpreter/timers/timerScheduler.go @@ -0,0 +1 @@ +package timers