Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IWF-274: Optimize Timer creation #529

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gen/iwfidl/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ components:
workflowConfigOverride:
disableSystemSearchAttribute: true
continueAsNewThreshold: 1
optimizeTimer: true
continueAsNewPageSizeInBytes: 4
executingStateIdMode: null
optimizeActivity: true
Expand Down Expand Up @@ -1011,6 +1012,7 @@ components:
workflowConfigOverride:
disableSystemSearchAttribute: true
continueAsNewThreshold: 1
optimizeTimer: true
continueAsNewPageSizeInBytes: 4
executingStateIdMode: null
optimizeActivity: true
Expand Down Expand Up @@ -1601,6 +1603,7 @@ components:
workflowConfig:
disableSystemSearchAttribute: true
continueAsNewThreshold: 1
optimizeTimer: true
continueAsNewPageSizeInBytes: 4
executingStateIdMode: null
optimizeActivity: true
Expand Down Expand Up @@ -3284,6 +3287,7 @@ components:
example:
disableSystemSearchAttribute: true
continueAsNewThreshold: 1
optimizeTimer: true
continueAsNewPageSizeInBytes: 4
executingStateIdMode: null
optimizeActivity: true
Expand All @@ -3298,6 +3302,8 @@ components:
type: integer
optimizeActivity:
type: boolean
optimizeTimer:
type: boolean
type: object
Context:
example:
Expand Down
26 changes: 26 additions & 0 deletions gen/iwfidl/docs/WorkflowConfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Name | Type | Description | Notes
**ContinueAsNewThreshold** | Pointer to **int32** | | [optional]
**ContinueAsNewPageSizeInBytes** | Pointer to **int32** | | [optional]
**OptimizeActivity** | Pointer to **bool** | | [optional]
**OptimizeTimer** | Pointer to **bool** | | [optional]

## Methods

Expand Down Expand Up @@ -154,6 +155,31 @@ SetOptimizeActivity sets OptimizeActivity field to given value.

HasOptimizeActivity returns a boolean if a field has been set.

### GetOptimizeTimer

`func (o *WorkflowConfig) GetOptimizeTimer() bool`

GetOptimizeTimer returns the OptimizeTimer field if non-nil, zero value otherwise.

### GetOptimizeTimerOk

`func (o *WorkflowConfig) GetOptimizeTimerOk() (*bool, bool)`

GetOptimizeTimerOk returns a tuple with the OptimizeTimer field if it's non-nil, zero value otherwise
and a boolean to check if the value has been set.

### SetOptimizeTimer

`func (o *WorkflowConfig) SetOptimizeTimer(v bool)`

SetOptimizeTimer sets OptimizeTimer field to given value.

### HasOptimizeTimer

`func (o *WorkflowConfig) HasOptimizeTimer() bool`

HasOptimizeTimer returns a boolean if a field has been set.


[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
36 changes: 36 additions & 0 deletions gen/iwfidl/model_workflow_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions integ/any_timer_signal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"
)

// TODO: crate greedy tests for cancelling timer early
func TestAnyTimerSignalWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
Expand Down Expand Up @@ -52,6 +53,8 @@ func TestAnyTimerSignalWorkflowCadenceContinueAsNew(t *testing.T) {
}
}

// TODO: crate greedy tests for cancelling timer early

func doTestAnyTimerSignalWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
wfHandler := anytimersignal.NewHandler()
Expand Down
3 changes: 3 additions & 0 deletions integ/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/assert"
)

// TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true
func TestTimerWorkflowTemporal(t *testing.T) {
if !*temporalIntegTest {
t.Skip()
Expand Down Expand Up @@ -55,6 +56,8 @@ func TestTimerWorkflowCadenceContinueAsNew(t *testing.T) {
}
}

// TODO: create greedy tests by copying these 4 tests and pass in OptimizeTimer: true

func doTestTimerWorkflow(t *testing.T, backendType service.BackendType, config *iwfidl.WorkflowConfig) {
// start test workflow server
wfHandler := timer.NewHandler()
Expand Down
2 changes: 1 addition & 1 deletion iwf-idl
Submodule iwf-idl updated 2 files
+2 −0 iwf-sdk.yaml
+2 −0 iwf.yaml
9 changes: 4 additions & 5 deletions service/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@ import (
"github.com/indeedeng/iwf/config"
"github.com/indeedeng/iwf/service/common/event"
"github.com/indeedeng/iwf/service/interpreter/env"
"github.com/indeedeng/iwf/service/interpreter/interfaces"
"net/http"
"os"
"strings"
"time"

uclient "github.com/indeedeng/iwf/service/client"
"github.com/indeedeng/iwf/service/common/compatibility"
"github.com/indeedeng/iwf/service/common/rpc"
"github.com/indeedeng/iwf/service/common/utils"
"github.com/indeedeng/iwf/service/interpreter"

"github.com/indeedeng/iwf/service/common/errors"
"github.com/indeedeng/iwf/service/common/log"
"github.com/indeedeng/iwf/service/common/log/tag"
"github.com/indeedeng/iwf/service/common/mapper"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/common/rpc"
"github.com/indeedeng/iwf/service/common/utils"

"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
Expand Down Expand Up @@ -801,7 +800,7 @@ func (s *serviceImpl) handleRpcBySynchronousUpdate(
ctx context.Context, req iwfidl.WorkflowRpcRequest,
) (resp *iwfidl.WorkflowRpcResponse, retError *errors.ErrorAndStatus) {
req.TimeoutSeconds = ptr.Any(utils.TrimRpcTimeoutSeconds(ctx, req))
var output interpreter.HandlerOutput
var output interfaces.HandlerOutput
err := s.client.SynchronousUpdateWorkflow(ctx, &output, req.GetWorkflowId(), req.GetWorkflowRunId(), service.ExecuteOptimisticLockingRpcUpdateType, req)
if err != nil {
errType := s.client.GetApplicationErrorTypeIfIsApplicationError(err)
Expand Down
21 changes: 11 additions & 10 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/indeedeng/iwf/service/common/rpc"
"github.com/indeedeng/iwf/service/common/urlautofix"
"github.com/indeedeng/iwf/service/interpreter/env"
"github.com/indeedeng/iwf/service/interpreter/interfaces"
"io"
"net/http"
"os"
Expand All @@ -30,7 +31,7 @@ func StateApiWaitUntil(
ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput,
) (*iwfidl.WorkflowStateStartResponse, error) {
stateApiWaitUntilStartTime := time.Now().UnixMilli()
provider := getActivityProviderByType(backendType)
provider := interfaces.GetActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateWaitUntilActivity", "input", log.ToJsonAndTruncateForLogging(input))
iwfWorkerBaseUrl := urlautofix.FixWorkerUrl(input.IwfWorkerUrl)
Expand Down Expand Up @@ -115,7 +116,7 @@ func StateApiExecute(
input service.StateDecideActivityInput,
) (*iwfidl.WorkflowStateDecideResponse, error) {
stateApiExecuteStartTime := time.Now().UnixMilli()
provider := getActivityProviderByType(backendType)
provider := interfaces.GetActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateExecuteActivity", "input", log.ToJsonAndTruncateForLogging(input))

Expand Down Expand Up @@ -197,20 +198,20 @@ func checkStateDecisionFromResponse(resp *iwfidl.WorkflowStateDecideResponse) er
return nil
}

func printDebugMsg(logger UnifiedLogger, err error, url string) {
func printDebugMsg(logger interfaces.UnifiedLogger, err error, url string) {
debugMode := os.Getenv(service.EnvNameDebugMode)
if debugMode != "" {
logger.Info("check error at http request", err, url)
}
}

func composeStartApiRespError(provider ActivityProvider, err error, resp *iwfidl.WorkflowStateStartResponse) error {
func composeStartApiRespError(provider interfaces.ActivityProvider, err error, resp *iwfidl.WorkflowStateStartResponse) error {
respStr, _ := resp.MarshalJSON()
return provider.NewApplicationError(string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE),
fmt.Sprintf("err msg: %v, response: %v", err, string(respStr)))
}

func composeExecuteApiRespError(provider ActivityProvider, err error, resp *iwfidl.WorkflowStateDecideResponse) error {
func composeExecuteApiRespError(provider interfaces.ActivityProvider, err error, resp *iwfidl.WorkflowStateDecideResponse) error {
respStr, _ := resp.MarshalJSON()
return provider.NewApplicationError(string(iwfidl.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE),
fmt.Sprintf("err msg: %v, response: %v", err, string(respStr)))
Expand All @@ -224,7 +225,7 @@ func checkHttpError(err error, httpResp *http.Response) bool {
}

func composeHttpError(
isLocalActivity bool, provider ActivityProvider, err error, httpResp *http.Response, errType string,
isLocalActivity bool, provider interfaces.ActivityProvider, err error, httpResp *http.Response, errType string,
) error {
responseBody := "None"
var statusCode int
Expand Down Expand Up @@ -329,7 +330,7 @@ func listTimerSignalInternalChannelCommandIds(commandReq *iwfidl.CommandRequest)
func DumpWorkflowInternal(
ctx context.Context, backendType service.BackendType, req iwfidl.WorkflowDumpRequest,
) (*iwfidl.WorkflowDumpResponse, error) {
provider := getActivityProviderByType(backendType)
provider := interfaces.GetActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("DumpWorkflowInternalActivity", "input", log.ToJsonAndTruncateForLogging(req))

Expand Down Expand Up @@ -357,15 +358,15 @@ func DumpWorkflowInternal(
func InvokeWorkerRpc(
ctx context.Context, backendType service.BackendType, rpcPrep *service.PrepareRpcQueryResponse,
req iwfidl.WorkflowRpcRequest,
) (*InvokeRpcActivityOutput, error) {
provider := getActivityProviderByType(backendType)
) (*interfaces.InvokeRpcActivityOutput, error) {
provider := interfaces.GetActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("InvokeWorkerRpcActivity", "input", log.ToJsonAndTruncateForLogging(req))

apiMaxSeconds := env.GetSharedConfig().Api.MaxWaitSeconds

resp, statusErr := rpc.InvokeWorkerRpc(ctx, rpcPrep, req, apiMaxSeconds)
return &InvokeRpcActivityOutput{
return &interfaces.InvokeRpcActivityOutput{
RpcOutput: resp,
StatusError: statusErr,
}, nil
Expand Down
5 changes: 3 additions & 2 deletions service/interpreter/activityImpl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service/common/ptr"
"github.com/indeedeng/iwf/service/interpreter/interfaces"
"github.com/stretchr/testify/assert"
"io"
"net/http"
Expand Down Expand Up @@ -195,9 +196,9 @@ func TestComposeHttpError_RegularActivity_NilResponse(t *testing.T) {
assert.Equal(t, returnedError, err)
}

func createTestComposeHttpErrorInitialState(t *testing.T, httpError string, initialError string) (*MockActivityProvider, *http.Response, error) {
func createTestComposeHttpErrorInitialState(t *testing.T, httpError string, initialError string) (*interfaces.MockActivityProvider, *http.Response, error) {
ctrl := gomock.NewController(t)
mockActivityProvider := NewMockActivityProvider(ctrl)
mockActivityProvider := interfaces.NewMockActivityProvider(ctrl)

var httpResp *http.Response = nil
if httpError != "" {
Expand Down
12 changes: 6 additions & 6 deletions service/interpreter/cadence/activityProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,35 @@ package cadence
import (
"context"
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/interpreter"
"github.com/indeedeng/iwf/service/interpreter/interfaces"
"go.uber.org/cadence"
"go.uber.org/cadence/activity"
)

type activityProvider struct{}

func init() {
interpreter.RegisterActivityProvider(service.BackendTypeCadence, &activityProvider{})
interfaces.RegisterActivityProvider(service.BackendTypeCadence, &activityProvider{})
}

func (a *activityProvider) NewApplicationError(errType string, details interface{}) error {
return cadence.NewCustomError(errType, details)
}

func (a *activityProvider) GetLogger(ctx context.Context) interpreter.UnifiedLogger {
func (a *activityProvider) GetLogger(ctx context.Context) interfaces.UnifiedLogger {
zLogger := activity.GetLogger(ctx)
return &loggerImpl{
zlogger: zLogger,
}
}

func (a *activityProvider) GetActivityInfo(ctx context.Context) interpreter.ActivityInfo {
func (a *activityProvider) GetActivityInfo(ctx context.Context) interfaces.ActivityInfo {
info := activity.GetInfo(ctx)
return interpreter.ActivityInfo{
return interfaces.ActivityInfo{
ScheduledTime: info.ScheduledTimestamp,
Attempt: info.Attempt + 1, // NOTE increase by one to match Temporal
IsLocalActivity: false, // TODO cadence doesn't support this yet
WorkflowExecution: interpreter.WorkflowExecution{
WorkflowExecution: interfaces.WorkflowExecution{
ID: info.WorkflowExecution.ID,
RunID: info.WorkflowExecution.RunID,
},
Expand Down
5 changes: 3 additions & 2 deletions service/interpreter/cadence/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package cadence
import (
"github.com/indeedeng/iwf/service"
"github.com/indeedeng/iwf/service/interpreter"
"github.com/indeedeng/iwf/service/interpreter/interfaces"
"go.uber.org/cadence/workflow"
)

func Interpreter(ctx workflow.Context, input service.InterpreterWorkflowInput) (*service.InterpreterWorkflowOutput, error) {
return interpreter.InterpreterImpl(interpreter.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), input)
return interpreter.InterpreterImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider(), input)
}

func WaitforStateCompletionWorkflow(ctx workflow.Context) (*service.WaitForStateCompletionWorkflowOutput, error) {
return interpreter.WaitForStateCompletionWorkflowImpl(interpreter.NewUnifiedContext(ctx), newCadenceWorkflowProvider())
return interpreter.WaitForStateCompletionWorkflowImpl(interfaces.NewUnifiedContext(ctx), newCadenceWorkflowProvider())
}
Loading
Loading