Skip to content

Commit

Permalink
Merge branch 'archivist-jobs-fix' of 'https://github.com/jjmerchante/…
Browse files Browse the repository at this point in the history
…grimoirelab-core'

Merges #29
Closes #29
  • Loading branch information
sduenas authored Feb 7, 2025
2 parents 62cd1b9 + f3dc449 commit ad300c8
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
7 changes: 4 additions & 3 deletions src/grimoirelab/core/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ def server(ctx: Context, devel: bool, clear_tasks: bool):
should be run with a reverse proxy. If you activate the '--dev' flag,
a HTTP server will be run instead.
"""
create_background_tasks(clear_tasks)

env = os.environ

env["UWSGI_ENV"] = f"DJANGO_SETTINGS_MODULE={ctx.obj['cfg']}"
Expand Down Expand Up @@ -92,6 +90,8 @@ def server(ctx: Context, devel: bool, clear_tasks: bool):
_ = django.core.wsgi.get_wsgi_application()
maintain_tasks()

create_background_tasks(clear_tasks)

# Run the server
os.execvp("uwsgi", ("uwsgi",))

Expand Down Expand Up @@ -151,6 +151,7 @@ def create_background_tasks(clear_tasks: bool):
"""
from grimoirelab.core.scheduler.scheduler import schedule_task
from grimoirelab.core.scheduler.tasks.models import StorageTask
from grimoirelab.core.scheduler.models import SchedulerStatus

workers = settings.GRIMOIRELAB_ARCHIVIST['WORKERS']
storage_url = settings.GRIMOIRELAB_ARCHIVIST['STORAGE_URL']
Expand All @@ -163,7 +164,7 @@ def create_background_tasks(clear_tasks: bool):
StorageTask.objects.all().delete()
click.echo("Removing old background tasks.")

current = StorageTask.objects.filter(burst=False).count()
current = StorageTask.objects.filter(burst=False).exclude(status=SchedulerStatus.FAILED).count()
if workers == current:
click.echo("Background tasks already created. Skipping.")
return
Expand Down
35 changes: 28 additions & 7 deletions src/grimoirelab/core/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@
from typing import Any


RQ_JOB_STOPPED_STATUS = [
rq.job.JobStatus.FINISHED,
rq.job.JobStatus.FAILED,
rq.job.JobStatus.STOPPED,
rq.job.JobStatus.CANCELED
]

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -124,20 +131,34 @@ def maintain_tasks() -> None:
for task in tasks:
job_db = task.jobs.order_by('-scheduled_at').first()

try:
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection(task.default_job_queue))
if not _is_job_removed_or_stopped(job_db, task.default_job_queue):
continue
except rq.exceptions.NoSuchJobError:
logger.debug(
f"Job #{job_db.job_id} in queue (task: {task.task_id}) missing. Rescheduling."
)

logger.debug(
f"Job #{job_db.job_id} in queue (task: {task.task_id}) stopped. Rescheduling."
)

current_time = datetime_utcnow()
scheduled_at = task.scheduled_at if task.scheduled_at > current_time else current_time
scheduled_at = max(task.scheduled_at, current_time)

_schedule_job(task, job_db, scheduled_at, job_db.job_args)


def _is_job_removed_or_stopped(job: Job, queue: str) -> bool:
"""
Check if the job was removed or stopped.
:param job: job to check.
:param queue: queue where the job is enqueued.
"""
try:
connection = django_rq.get_connection(queue)
job_rq = rq.job.Job.fetch(job.uuid, connection=connection)
return job_rq.get_status() in RQ_JOB_STOPPED_STATUS
except rq.exceptions.NoSuchJobError:
return True


def _enqueue_task(
task: Task,
scheduled_at: datetime.datetime | None = None
Expand Down

0 comments on commit ad300c8

Please sign in to comment.