Skip to content

Commit

Permalink
update the design for workflow framework
Browse files Browse the repository at this point in the history
  • Loading branch information
KunWuLuan committed Sep 30, 2024
1 parent 012eabe commit 1ed89ab
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 31 deletions.
144 changes: 114 additions & 30 deletions keps/74-support-argo-workflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -406,53 +406,137 @@ follow the same api to queue their workflows.
### Stage as An Unit
#### Workflow Framework in Kueue
#### SegmentableJob (Not sure the name)
When we regard stage as an unit, multiple workloads will be created for one workflow. So the current
job framework is not suitable for this case. We need to create a new `workflow framework` for the
workflows. The generic workflow framework reconcile workflows in following way:
job types are not suitable for this case. We will create a new kind of job interface called SegmentableJob.
And workload can be integrated with Kueue by implement their SegmentableJob interface.
``` go
type GenericWorkflow interface {
GetActiveStages() []GenericJob
package segmentablejob

import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
)

type SegmentableJob interface {
jobframework.GenericJob
}

type SegmentableJobHandler interface {
GetParentJob() client.Object
GetParentJobNamespaceName(childRequest ctrl.Request) ctrl.Request
GetChildJobNamespaceNameByWorkload(w *kueue.Workload) ctrl.Request
// Input of this function will be the request created by EventHandler
BuildSegmentableJob(childRequest ctrl.Request) SegmentableJob
}

---
package segmentablejob

import (
"context"

func (r *workflowReconciler)ReconcileGenericWorkflow(ctx context.Context, req ctrl.Request, wf GenericWorkflow) {
dropFinalizers := GetWorkflowInstance()
if dropFinalizers {
DropFinalizers()
return
}
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

// currently we do not support parent-workload for workflow
// stage should be implemented as GenericJob
stages := wf.GetActiveStages()
for _, stage := range stages {
// 1. make sure there is only a single existing instance of the workload.
// If there's no workload exists and stage is unsuspended, we'll stop it immediately.
// Stage should support suspend and resume
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
)

// 1.1 If the workload is pending deletion, suspend the stage if needed
// and drop the finalizer.
var FrameworkName = ""

// 2. handle stage is finished.
type EventHandlerFactory func(client.Client) handler.EventHandler
type Reconciler struct {
gvk schema.GroupVersionKind
jobObj client.Object
eventParserFactory EventHandlerFactory
handler SegmentableJobHandler

// 3. handle workload is nil.
c client.Client

// 4. update reclaimable counts if implemented by the stage
*jobframework.JobReconciler
}

// 5. handle WaitForPodsReady only for a standalone job.
// handle a job when waitForPodsReady is enabled, and it is the main job
func NewReconcilerFactory(gvk schema.GroupVersionKind,
jobObj client.Object,
eventParserFactory EventHandlerFactory,
handler SegmentableJobHandler) jobframework.ReconcilerFactory {
return func(client client.Client, record record.EventRecorder, opts ...jobframework.Option) jobframework.JobReconcilerInterface {
return &Reconciler{
gvk: gvk,
jobObj: jobObj,
eventParserFactory: eventParserFactory,
handler: handler,

c: client,

JobReconciler: jobframework.NewReconciler(client, record, opts...),
}
}
}

// 6. handle eviction
// You need to enqueue the namespace name for the child job in eventHandler so that we can build the child Job when we call Reconcile.
// The namespace name of the parent job will be fetched by handler.GetParentJobNamespaceName when we call Reconcile.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
return r.ReconcileGenericJob(ctx, r.handler.GetParentJobNamespaceName(req), r.handler.BuildSegmentableJob(req))
}

// 7. handle job is suspended.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
concurrency := mgr.GetControllerOptions().GroupKindConcurrency[r.gvk.GroupKind().String()]
ctrl.Log.V(3).Info("Setting up reconciler for SegmentableJob", "gvk", r.gvk.GroupKind().String(), "concurrency", concurrency)
return ctrl.NewControllerManagedBy(mgr).
Watches(r.jobObj, r.eventParserFactory(r.c)).Named(r.gvk.GroupKind().String()).
Watches(&kueue.Workload{}, &parentWorkloadHandler{handler: r.handler}).
WithOptions(controller.Options{
MaxConcurrentReconciles: concurrency,
}).
Complete(r)
}

// 8. handle job is unsuspended.
r.ReconcileStage(ctx, req, stage, wf)
}
type parentWorkloadHandler struct {
handler SegmentableJobHandler
}

func (h *parentWorkloadHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) {
workload, ok := e.Object.(*kueue.Workload)
if !ok {
return
}
namespaceName := h.handler.GetChildJobNamespaceNameByWorkload(workload)
q.AddRateLimited(namespaceName)
}

func (h *parentWorkloadHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) {
workload, ok := e.ObjectNew.(*kueue.Workload)
if !ok {
return
}
namespaceName := h.handler.GetChildJobNamespaceNameByWorkload(workload)
q.AddRateLimited(namespaceName)
}

func (h *parentWorkloadHandler) Delete(_ context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) {
workload, ok := e.Object.(*kueue.Workload)
if !ok {
return
}
namespaceName := h.handler.GetChildJobNamespaceNameByWorkload(workload)
q.AddRateLimited(namespaceName)
}

func (h *parentWorkloadHandler) Generic(_ context.Context, _ event.GenericEvent, _ workqueue.RateLimitingInterface) {
}

```

Each workflow managers can implement their own controller to integrate with the workflow framework.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"sigs.k8s.io/kueue/pkg/podset"
)

// GenericJob if the interface which needs to be implemented by all jobs
// GenericJob is the interface which needs to be implemented by all jobs
// managed by the kueue's jobframework.
type GenericJob interface {
// Object returns the job instance.
Expand Down

0 comments on commit 1ed89ab

Please sign in to comment.