Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Option to Prevent Overlapping Replications of the Same Artifact in Harbor #21347

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
14 changes: 10 additions & 4 deletions api/v2.0/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ paths:
items:
$ref: '#/definitions/AuditLogEventType'
'401':
$ref: '#/responses/401'
$ref: '#/responses/401'
/projects/{project_name}/logs:
get:
summary: Get recent logs of the projects (deprecated)
Expand Down Expand Up @@ -1875,7 +1875,7 @@ paths:
'401':
$ref: '#/responses/401'
'500':
$ref: '#/responses/500'
$ref: '#/responses/500'
/p2p/preheat/providers:
get:
summary: List P2P providers
Expand Down Expand Up @@ -6960,8 +6960,8 @@ definitions:
description: The time when this operation is triggered.
AuditLogEventType:
type: object
properties:
event_type:
properties:
event_type:
type: string
description: the event type, such as create_user.
example: create_user
Expand Down Expand Up @@ -7457,6 +7457,12 @@ definitions:
type: boolean
description: Whether to enable copy by chunk.
x-isnullable: true
single_active_replication:
type: boolean
description: |-
Whether to defer execution until the previous active execution finishes,
avoiding the execution of the same replication rules multiple times in parallel.
x-isnullable: true # make this field optional to keep backward compatibility
ReplicationTrigger:
type: object
properties:
Expand Down
2 changes: 2 additions & 0 deletions make/migrations/postgresql/0160_2.13.0_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ CREATE INDEX IF NOT EXISTS idx_audit_log_ext_op_time ON audit_log_ext (op_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_optime ON audit_log_ext (project_id, op_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_resource_type ON audit_log_ext (project_id, resource_type);
CREATE INDEX IF NOT EXISTS idx_audit_log_ext_project_id_operation ON audit_log_ext (project_id, operation);

ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS single_active_replication boolean;
4 changes: 2 additions & 2 deletions src/controller/jobmonitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func NewMonitorController() MonitorController {
taskManager: task.NewManager(),
queueManager: jm.NewQueueClient(),
queueStatusManager: queuestatus.Mgr,
monitorClient: jobServiceMonitorClient,
monitorClient: JobServiceMonitorClient,
jobServiceRedisClient: jm.JobServiceRedisClient,
executionDAO: taskDao.NewExecutionDAO(),
}
}

func jobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
func JobServiceMonitorClient() (jm.JobServiceMonitorClient, error) {
cfg, err := job.GlobalClient.GetJobServiceConfig()
if err != nil {
return nil, err
Expand Down
38 changes: 38 additions & 0 deletions src/controller/replication/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ package replication

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/gocraft/work"

"github.com/goharbor/harbor/src/controller/event/operator"
"github.com/goharbor/harbor/src/controller/jobmonitor"
"github.com/goharbor/harbor/src/controller/replication/flow"
replicationmodel "github.com/goharbor/harbor/src/controller/replication/model"
"github.com/goharbor/harbor/src/jobservice/job"
Expand Down Expand Up @@ -109,10 +113,32 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
if op := operator.FromContext(ctx); op != "" {
extra["operator"] = op
}

id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger, extra)
if err != nil {
return 0, err
}

if policy.SingleActiveReplication {
monitorClient, err := jobmonitor.JobServiceMonitorClient()
if err != nil {
return 0, errors.New(nil).WithCode(errors.PreconditionCode).WithMessagef("unable to get job monitor's client: %v", err)
}
observations, err := monitorClient.WorkerObservations()
if err != nil {
return 0, errors.New(nil).WithCode(errors.PreconditionCode).WithMessagef("unable to get jobs observations: %v", err)
}
for _, o := range observations {
if isDuplicateJob(o, policy.ID) {
err = c.execMgr.MarkSkipped(ctx, id, "Execution deferred: active replication still in progress.")
if err != nil {
return 0, err
}
return id, nil
}
}
}

// start the replication flow in background
// as the process runs inside a goroutine, the transaction in the outer ctx
// may be submitted already when the process starts, so create an new context
Expand Down Expand Up @@ -151,6 +177,18 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
return id, nil
}

func isDuplicateJob(o *work.WorkerObservation, policyID int64) bool {
if o.JobName != job.ReplicationVendorType {
return false
}
args := map[string]interface{}{}
if err := json.Unmarshal([]byte(o.ArgsJSON), &args); err != nil {
return false
}
policyIDFromArgs, ok := args["policy_id"].(float64)
return ok && int64(policyIDFromArgs) == policyID
}

func (c *controller) markError(ctx context.Context, executionID int64, err error) {
logger := log.GetLogger(ctx)
// try to stop the execution first in case that some tasks are already created
Expand Down
19 changes: 11 additions & 8 deletions src/controller/replication/flow/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (c *copyFlow) Run(ctx context.Context) error {
return err
}

return c.createTasks(ctx, srcResources, dstResources, c.policy.Speed, c.policy.CopyByChunk)
return c.createTasks(ctx, srcResources, dstResources, c.policy)
}

func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
Expand All @@ -103,7 +103,7 @@ func (c *copyFlow) isExecutionStopped(ctx context.Context) (bool, error) {
return execution.Status == job.StoppedStatus.String(), nil
}

func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, speed int32, copyByChunk bool) error {
func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources []*model.Resource, policy *repctlmodel.Policy) error {
var taskCnt int
defer func() {
// if no task be created, mark execution done.
Expand Down Expand Up @@ -137,19 +137,22 @@ func (c *copyFlow) createTasks(ctx context.Context, srcResources, dstResources [
JobKind: job.KindGeneric,
},
Parameters: map[string]interface{}{
"src_resource": string(src),
"dst_resource": string(dest),
"speed": speed,
"copy_by_chunk": copyByChunk,
"src_resource": string(src),
"dst_resource": string(dest),
"speed": policy.Speed,
"copy_by_chunk": policy.CopyByChunk,
"single_active_replication": policy.SingleActiveReplication,
"policy_id": policy.ID,
},
}

if _, err = c.taskMgr.Create(ctx, c.executionID, job, map[string]interface{}{
"operation": "copy",
"resource_type": string(srcResource.Type),
"resource_type": srcResource.Type,
"source_resource": getResourceName(srcResource),
"destination_resource": getResourceName(dstResource),
"references": getResourceReferences(dstResource)}); err != nil {
"references": getResourceReferences(dstResource),
}); err != nil {
return err
}

Expand Down
3 changes: 3 additions & 0 deletions src/controller/replication/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Policy struct {
UpdateTime time.Time `json:"update_time"`
Speed int32 `json:"speed"`
CopyByChunk bool `json:"copy_by_chunk"`
SingleActiveReplication bool `json:"single_active_replication"`
}

// IsScheduledTrigger returns true when the policy is scheduled trigger and enabled
Expand Down Expand Up @@ -141,6 +142,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
p.UpdateTime = policy.UpdateTime
p.Speed = policy.Speed
p.CopyByChunk = policy.CopyByChunk
p.SingleActiveReplication = policy.SingleActiveReplication

if policy.SrcRegistryID > 0 {
p.SrcRegistry = &model.Registry{
Expand Down Expand Up @@ -186,6 +188,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) {
UpdateTime: p.UpdateTime,
Speed: p.Speed,
CopyByChunk: p.CopyByChunk,
SingleActiveReplication: p.SingleActiveReplication,
}
if p.SrcRegistry != nil {
policy.SrcRegistryID = p.SrcRegistry.ID
Expand Down
4 changes: 4 additions & 0 deletions src/jobservice/job/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
SuccessStatus Status = "Success"
// ScheduledStatus : job status scheduled
ScheduledStatus Status = "Scheduled"
// SkippedStatus : job status skipped
SkippedStatus Status = "Skipped"
)

// Status of job
Expand Down Expand Up @@ -62,6 +64,8 @@ func (s Status) Code() int {
return 3
case "Success":
return 3
case "Skipped":
return 3
default:
}

Expand Down
9 changes: 7 additions & 2 deletions src/jobservice/worker/cworker/c_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ type basicWorker struct {

// workerContext ...
// We did not use this context to pass context info so far, just a placeholder.
type workerContext struct{}
type workerContext struct {
client *work.Client
}

// log the job
func (rpc *workerContext) logJob(job *work.Job, next work.NextMiddlewareFunc) error {
Expand Down Expand Up @@ -146,9 +148,12 @@ func (w *basicWorker) Start() error {
w.pool.Stop()
}()

workCtx := workerContext{
client: w.client,
}
// Start the backend worker pool
// Add middleware
w.pool.Middleware((*workerContext).logJob)
w.pool.Middleware(workCtx.logJob)
// Non blocking call
w.pool.Start()
logger.Infof("Basic worker is started")
Expand Down
1 change: 1 addition & 0 deletions src/pkg/replication/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Policy struct {
UpdateTime time.Time `orm:"column(update_time);auto_now"`
Speed int32 `orm:"column(speed_kb)"`
CopyByChunk bool `orm:"column(copy_by_chunk)"`
SingleActiveReplication bool `orm:"column(single_active_replication)"`
}

// TableName set table name for ORM
Expand Down
14 changes: 14 additions & 0 deletions src/pkg/retention/policy/rule/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,20 @@ func init() {
},
},
}, daysps.New, daysps.Valid)

// Register daysps
Register(&Metadata{
TemplateID: daysps.TemplateID,
Action: "immutable",
Parameters: []*IndexedParam{
{
Name: daysps.ParameterN,
Type: "int",
Unit: "days",
Required: true,
},
},
}, daysps.New, daysps.Valid)
}

// Register the rule evaluator with the corresponding rule template
Expand Down
13 changes: 13 additions & 0 deletions src/pkg/task/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ExecutionManager interface {
// In other cases, the execution status can be calculated from the referenced tasks automatically
// and no need to update it explicitly
MarkDone(ctx context.Context, id int64, message string) (err error)
// MarkSkipped marks the status of the specified execution as skipped.
MarkSkipped(ctx context.Context, id int64, message string) (err error)
// MarkError marks the status of the specified execution as error.
// It must be called to update the execution status when failed to create tasks.
// In other cases, the execution status can be calculated from the referenced tasks automatically
Expand Down Expand Up @@ -139,6 +141,17 @@ func (e *executionManager) UpdateExtraAttrs(ctx context.Context, id int64, extra
return e.executionDAO.Update(ctx, execution, "ExtraAttrs", "UpdateTime")
}

func (e *executionManager) MarkSkipped(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
ID: id,
Status: job.SkippedStatus.String(),
StatusMessage: message,
UpdateTime: now,
EndTime: now,
}, "Status", "StatusMessage", "UpdateTime", "EndTime")
}

func (e *executionManager) MarkDone(ctx context.Context, id int64, message string) error {
now := time.Now()
return e.executionDAO.Update(ctx, &dao.Execution{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,37 @@ <h3 class="modal-title">{{ headerTitle | translate }}</h3>
'REPLICATION.ENABLED_RULE' | translate
}}</label>
</div>
<div class="clr-checkbox-wrapper">
<input
type="checkbox"
class="clr-checkbox"
[checked]="true"
id="singleActiveReplication"
formControlName="single_active_replication" />
<label
for="singleActiveReplication"
class="clr-control-label single-active"
>{{
'REPLICATION.SINGLE_ACTIVE_REPLICATION'
| translate
}}
<clr-tooltip class="override-tooltip">
<clr-icon
clrTooltipTrigger
shape="info-circle"
size="24"></clr-icon>
<clr-tooltip-content
clrPosition="top-left"
clrSize="md"
*clrIfOpen>
<span>{{
'TOOLTIP.SINGLE_ACTIVE_REPLICATION'
| translate
}}</span>
</clr-tooltip-content>
</clr-tooltip>
</label>
</div>
</div>
</div>
</form>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ clr-modal {
width: 8.6rem;
}

.single-active {
width: 16rem;
}

.des-tooltip {
margin-left: 0.5rem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
override: true,
speed: -1,
copy_by_chunk: false,
single_active_replication: false,
});
}

Expand Down Expand Up @@ -367,6 +368,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
dest_namespace_replace_count: Flatten_Level.FLATTEN_LEVEl_1,
speed: -1,
copy_by_chunk: false,
single_active_replication: false,
});
this.isPushMode = true;
this.selectedUnit = BandwidthUnit.KB;
Expand Down Expand Up @@ -410,6 +412,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
override: rule.override,
speed: speed,
copy_by_chunk: rule.copy_by_chunk,
single_active_replication: rule.single_active_replication,
});
let filtersArray = this.getFilterArray(rule);
this.noSelectedEndpoint = false;
Expand Down
2 changes: 2 additions & 0 deletions src/portal/src/i18n/lang/de-de-lang.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"PULL_BASED": "Lade die Ressourcen von der entfernten Registry auf den lokalen Harbor runter.",
"DESTINATION_NAMESPACE": "Spezifizieren des Ziel-Namespace. Wenn das Feld leer ist, werden die Ressourcen unter dem gleichen Namespace abgelegt wie in der Quelle.",
"OVERRIDE": "Spezifizieren, ob die Ressourcen am Ziel überschrieben werden sollen, falls eine Ressource mit gleichem Namen existiert.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to defer execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "E-Mail sollte eine gültige E-Mail-Adresse wie [email protected] sein.",
"USER_NAME": "Darf keine Sonderzeichen enthalten und sollte kürzer als 255 Zeichen sein.",
"FULL_NAME": "Maximale Länge soll 20 Zeichen sein.",
Expand Down Expand Up @@ -560,6 +561,7 @@
"ALLOWED_CHARACTERS": "Erlaubte Sonderzeichen",
"TOTAL": "Gesamt",
"OVERRIDE": "Überschreiben",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Aktiviere Regel",
"OVERRIDE_INFO": "Überschreiben",
"OPERATION": "Operation",
Expand Down
Loading
Loading