Skip to content

Commit

Permalink
IWF-274: fix prune split logic and tests to pass timer config through…
Browse files Browse the repository at this point in the history
… to interpreter
  • Loading branch information
jbowers committed Jan 14, 2025
1 parent a6399df commit b1fef0a
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 42 deletions.
39 changes: 39 additions & 0 deletions integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,45 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) {
}

// TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true
func TestGreedyTimerWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumGreedyTimerConfig(true, false))
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowCadence(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestTimerWorkflow(t, service.BackendTypeCadence, minimumGreedyTimerConfig(true, false))
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowTemporalContinueAsNew(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestTimerWorkflow(t, service.BackendTypeTemporal, minimumContinueAsNewConfigV0())
smallWaitForFastTest()
}
}

func TestGreedyTimerWorkflowCadenceContinueAsNew(t *testing.T) {
if !*cadenceIntegTest {
t.Skip()
}
for i := 0; i < *repeatIntegTest; i++ {
doTestTimerWorkflow(t, service.BackendTypeCadence, minimumContinueAsNewConfigV0())
smallWaitForFastTest()
}
}

func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
Expand Down
13 changes: 13 additions & 0 deletions integ/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ func minimumContinueAsNewConfig(optimizeActivity bool) *iwfidl.WorkflowConfig {
}
}

func minimumGreedyTimerConfig(optimizeTimer bool, continueAsNew bool) *iwfidl.WorkflowConfig {
if continueAsNew {
return &iwfidl.WorkflowConfig{
ContinueAsNewThreshold: iwfidl.PtrInt32(1),
OptimizeTimer: iwfidl.PtrBool(optimizeTimer),
}
}

return &iwfidl.WorkflowConfig{
OptimizeTimer: iwfidl.PtrBool(optimizeTimer),
}
}

func minimumContinueAsNewConfigV0() *iwfidl.WorkflowConfig {
return minimumContinueAsNewConfig(false)
}
Expand Down
3 changes: 3 additions & 0 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ func overrideWorkflowConfig(configOverride iwfidl.WorkflowConfig, workflowConfig
if configOverride.OptimizeActivity != nil {
workflowConfig.OptimizeActivity = configOverride.OptimizeActivity
}
if configOverride.OptimizeTimer != nil {
workflowConfig.OptimizeTimer = configOverride.OptimizeTimer
}
}

func (s *serviceImpl) ApiV1WorkflowWaitForStateCompletion(
Expand Down
93 changes: 51 additions & 42 deletions service/interpreter/timers/greedyTimerProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
"github.com/indeedeng/iwf/service"
)

type sortedTimers struct {
status service.InternalTimerStatus
// Ordered slice of all timers being awaited on
timers []*service.TimerInfo
type TimerManager struct {
// Does not map to the timers actually created by the workflow provider
PendingScheduling []*service.TimerInfo
// timers created through the workflow provider that are going to fire
ScheduledTimerTimes []int64
}

type GreedyTimerProcessor struct {
pendingTimers sortedTimers
timerManger TimerManager
stateExecutionCurrentTimerInfos map[string][]*service.TimerInfo
staleSkipTimerSignals []service.StaleSkipTimerSignal
provider interfaces.WorkflowProvider
Expand All @@ -32,13 +33,13 @@ func NewGreedyTimerProcessor(

tp := &GreedyTimerProcessor{
provider: provider,
pendingTimers: sortedTimers{status: service.TimerPending},
timerManger: TimerManager{},
stateExecutionCurrentTimerInfos: map[string][]*service.TimerInfo{},
logger: provider.GetLogger(ctx),
staleSkipTimerSignals: staleSkipTimerSignals,
}

// start some single thread that manages timers
// start some single thread that manages PendingScheduling
tp.createGreedyTimerScheduler(ctx, continueAsNewCounter)

err := provider.SetQueryHandler(ctx, service.GetCurrentTimerInfosQueryType, func() (service.GetCurrentTimerInfosQueryResponse, error) {
Expand All @@ -52,14 +53,14 @@ func NewGreedyTimerProcessor(
return tp
}

func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) {
func (t *TimerManager) addTimer(toAdd *service.TimerInfo) {

if toAdd == nil || toAdd.Status != t.status {
if toAdd == nil || toAdd.Status != service.TimerPending {
panic("invalid timer added")
}

insertIndex := 0
for i, timer := range t.timers {
for i, timer := range t.PendingScheduling {
if toAdd.FiringUnixTimestampSeconds >= timer.FiringUnixTimestampSeconds {
// don't want dupes. Makes remove simpler
if toAdd == timer {
Expand All @@ -70,73 +71,79 @@ func (t *sortedTimers) addTimer(toAdd *service.TimerInfo) {
}
insertIndex = i + 1
}
t.timers = append(
t.timers[:insertIndex],
append([]*service.TimerInfo{toAdd}, t.timers[insertIndex:]...)...)
t.PendingScheduling = append(
t.PendingScheduling[:insertIndex],
append([]*service.TimerInfo{toAdd}, t.PendingScheduling[insertIndex:]...)...)
}

func (t *sortedTimers) removeTimer(toRemove *service.TimerInfo) {
for i, timer := range t.timers {
func (t *TimerManager) removeTimer(toRemove *service.TimerInfo) {
for i, timer := range t.PendingScheduling {
if toRemove == timer {
t.timers = append(t.timers[:i], t.timers[i+1:]...)
t.PendingScheduling = append(t.PendingScheduling[:i], t.PendingScheduling[i+1:]...)
return
}
}
}

func (t *sortedTimers) pruneToNextTimer(pruneTo int64) *service.TimerInfo {
func (t *TimerManager) pruneToNextTimer(pruneTo int64) *service.TimerInfo {

if len(t.timers) == 0 {
if len(t.PendingScheduling) == 0 {
return nil
}

index := len(t.timers)
index := len(t.PendingScheduling)

for i := len(t.timers) - 1; i >= 0; i-- {
timer := t.timers[i]
if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == t.status {
for i := len(t.PendingScheduling) - 1; i >= 0; i-- {
timer := t.PendingScheduling[i]
if timer.FiringUnixTimestampSeconds > pruneTo && timer.Status == service.TimerPending {
break
}
index = i
}
t.timers = t.timers[:index]
return t.timers[index-1]

// If index is 0, it means all timers are pruned
if index == 0 {
t.PendingScheduling = nil
return nil
}

prunedTimer := t.PendingScheduling[index-1]
t.PendingScheduling = t.PendingScheduling[:index]
return prunedTimer
}

func (t *GreedyTimerProcessor) createGreedyTimerScheduler(
ctx interfaces.UnifiedContext,
continueAsNewCounter *cont.ContinueAsNewCounter) {

t.provider.GoNamed(ctx, "greedy-timer-scheduler", func(ctx interfaces.UnifiedContext) {
// NOTE: next timer to fire is at the end of the slice
var createdTimers []int64
for {
t.provider.Await(ctx, func() bool {
// remove fired timers
_ = t.provider.Await(ctx, func() bool {
// remove fired PendingScheduling
now := t.provider.Now(ctx).Unix()
for i := len(createdTimers) - 1; i >= 0; i-- {
if createdTimers[i] > now {
createdTimers = createdTimers[:i+1]
for i := len(t.timerManger.ScheduledTimerTimes) - 1; i >= 0; i-- {
if t.timerManger.ScheduledTimerTimes[i] > now {
t.timerManger.ScheduledTimerTimes = t.timerManger.ScheduledTimerTimes[:i+1]
break
}
}
next := t.pendingTimers.pruneToNextTimer(now)
return (next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1])) || continueAsNewCounter.IsThresholdMet()
next := t.timerManger.pruneToNextTimer(now)
return (next != nil && (len(t.timerManger.ScheduledTimerTimes) == 0 || next.FiringUnixTimestampSeconds < t.timerManger.ScheduledTimerTimes[len(t.timerManger.ScheduledTimerTimes)-1])) || continueAsNewCounter.IsThresholdMet()
})

if continueAsNewCounter.IsThresholdMet() {
break
}

now := t.provider.Now(ctx).Unix()
next := t.pendingTimers.pruneToNextTimer(now)
//next := t.pendingTimers.getEarliestTimer()
next := t.timerManger.pruneToNextTimer(now)
//next := t.timerManger.getEarliestTimer()
// only create a new timer when a pending timer exists before the next existing timer fires
if next != nil && (len(createdTimers) == 0 || next.FiringUnixTimestampSeconds < createdTimers[len(createdTimers)-1]) {
if next != nil && (len(t.timerManger.ScheduledTimerTimes) == 0 || next.FiringUnixTimestampSeconds < t.timerManger.ScheduledTimerTimes[len(t.timerManger.ScheduledTimerTimes)-1]) {
fireAt := next.FiringUnixTimestampSeconds
duration := time.Duration(fireAt-now) * time.Second
t.provider.NewTimer(ctx, duration)
createdTimers = append(createdTimers, fireAt)
t.timerManger.ScheduledTimerTimes = append(t.timerManger.ScheduledTimerTimes, fireAt)
}
}
})
Expand Down Expand Up @@ -216,23 +223,23 @@ func (t *GreedyTimerProcessor) WaitForTimerFiredOrSkipped(
return service.TimerSkipped
}

if timer.FiringUnixTimestampSeconds >= t.provider.Now(ctx).Unix() {
if timer.FiringUnixTimestampSeconds <= t.provider.Now(ctx).Unix() {
timer.Status = service.TimerFired
return service.TimerFired
}

// otherwise *cancelWaiting should return false to indicate that this timer isn't completed(fired or skipped)
t.pendingTimers.removeTimer(timer)
t.timerManger.removeTimer(timer)
return service.TimerPending
}

// RemovePendingTimersOfState is for when a state is completed, remove all its pending timers
// RemovePendingTimersOfState is for when a state is completed, remove all its pending PendingScheduling
func (t *GreedyTimerProcessor) RemovePendingTimersOfState(stateExeId string) {

timers := t.stateExecutionCurrentTimerInfos[stateExeId]

for _, timer := range timers {
t.pendingTimers.removeTimer(timer)
t.timerManger.removeTimer(timer)
}

delete(t.stateExecutionCurrentTimerInfos, stateExeId)
Expand All @@ -256,7 +263,9 @@ func (t *GreedyTimerProcessor) AddTimers(
FiringUnixTimestampSeconds: cmd.GetFiringUnixTimestampSeconds(),
Status: service.TimerPending,
}
t.pendingTimers.addTimer(&timer)
}
if timer.Status == service.TimerPending {
t.timerManger.addTimer(&timer)
}
timers[idx] = &timer
}
Expand Down
1 change: 1 addition & 0 deletions service/interpreter/timers/timerScheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package timers

0 comments on commit b1fef0a

Please sign in to comment.