Skip to content

Commit

Permalink
boost
Browse files Browse the repository at this point in the history
  • Loading branch information
domdinicola committed Feb 3, 2025
1 parent c7b16c4 commit 612b2ce
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 5 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ jobs:
docker: ${{ steps.changes.outputs.docker_base }}
steps:
- run: git config --global --add safe.directory $(realpath .)
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1
- uses: actions/checkout@v4
- id: changes
name: Check for backend file changes
uses: dorny/paths-filter@0bc4621a3135347011ad047f9ecf449bf72ce2bd # v3.0.0
uses: dorny/paths-filter@v3
with:
base: ${{ github.ref }}
token: ${{ github.token }}
Expand All @@ -53,4 +53,4 @@ jobs:
run: pip install ruff
- name: Check syntax
# Stop the build if there are Python syntax errors or undefined names
run: ruff check src/
run: ruff check -e
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ dependencies = [
"paramiko",
"phonenumbers",
"psycopg[binary]",
"python-redis-lock[django]",
"sentry-sdk",
"social-auth-app-django",
"social-auth-core",
Expand All @@ -117,3 +118,10 @@ version = "0.1"
description = "HOPE integration with FSPs"
readme = "README.md"
license = {text = "MIT"}


[tool.nitpick]
style = [
"github://unicef/hope-code-conventions@main/django/django.toml"
]
cache = "1 day"
11 changes: 10 additions & 1 deletion src/hope_payment_gateway/apps/fsp/moneygram/tasks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import logging

from constance import config
from strategy_field.utils import fqn

from hope_payment_gateway.apps.fsp.moneygram.client import MoneyGramClient
from hope_payment_gateway.apps.gateway.models import (
PaymentInstruction,
PaymentInstructionState,
PaymentRecord,
PaymentRecordState,
AsyncJob,
)
from hope_payment_gateway.config.celery import app

Expand Down Expand Up @@ -36,7 +38,14 @@ def moneygram_send_money(tag=None, threshold=10000):

logging.info(f"Sending {records_count} records {pi} to MoneyGram")
records_ids = list(records.values_list("id", flat=True))
moneygram_notify.delay(records_ids)
job = AsyncJob.objects.create(
description="Send Instruction to MoneyGram",
type=AsyncJob.JobType.STANDARD_TASK,
action=fqn(moneygram_notify),
config={"to_process_ids": records_ids},
instruction=pi,
)
job.queue()
pi.status = PaymentInstructionState.PROCESSED
pi.save()

Expand Down
11 changes: 10 additions & 1 deletion src/hope_payment_gateway/apps/fsp/western_union/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from constance import config
from strategy_field.utils import fqn

from hope_payment_gateway.apps.fsp.western_union.api.client import WesternUnionClient
from hope_payment_gateway.apps.fsp.western_union.models import Corridor
Expand All @@ -10,6 +11,7 @@
PaymentInstructionState,
PaymentRecord,
PaymentRecordState,
AsyncJob,
)
from hope_payment_gateway.config.celery import app

Expand All @@ -35,7 +37,14 @@ def western_union_send_task(tag=None, threshold=10000):

logging.info(f"Sending {records_count} records {pi} to Western Union")
records_ids = list(records.values_list("id", flat=True))
western_union_notify.delay(records_ids)
job = AsyncJob.objects.create(
description="Send instruction to Western Union",
type=AsyncJob.JobType.STANDARD_TASK,
action=fqn(western_union_notify),
config={"to_process_ids": records_ids},
instruction=pi,
)
job.queue()
pi.status = PaymentInstructionState.PROCESSED
pi.save()

Expand Down
17 changes: 17 additions & 0 deletions src/hope_payment_gateway/apps/gateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from adminactions.export import base_export
from adminfilters.autocomplete import AutoCompleteFilter
from adminfilters.mixin import AdminFiltersMixin
from django_celery_boost.admin import CeleryTaskModelAdmin
from jsoneditor.forms import JSONEditor
from viewflow.fsm import TransitionNotAllowed

Expand All @@ -37,6 +38,7 @@
moneygram_update_status,
)
from hope_payment_gateway.apps.gateway.models import (
AsyncJob,
DeliveryMechanism,
ExportTemplate,
FinancialServiceProvider,
Expand Down Expand Up @@ -544,3 +546,18 @@ class ExportTemplateAdmin(ExtraButtonsMixin, admin.ModelAdmin):
list_display = ("fsp", "delivery_mechanism", "config_key")
search_fields = ("config_key", "delivery_mechanism__name", "fsp__name")
raw_id_fields = ("fsp", "delivery_mechanism")


@admin.register(AsyncJob)
class AsyncJobAdmin(AdminFiltersMixin, CeleryTaskModelAdmin, admin.ModelAdmin):
list_display = ("type", "verbose_status", "owner")
autocomplete_fields = ("owner", "content_type")
list_filter = (
("owner", AutoCompleteFilter),
"type",
)

def get_readonly_fields(self, request: "HttpRequest", obj: AsyncJob | None = None):
if obj:
return ("owner", "local_status", "type", "action", "sentry_id")
return super().get_readonly_fields(request, obj)
139 changes: 139 additions & 0 deletions src/hope_payment_gateway/apps/gateway/migrations/0027_asyncjob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Generated by Django 5.1.4 on 2025-01-03 09:12

import concurrency.fields
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("gateway", "0026_alter_paymentinstruction_status"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]

operations = [
migrations.CreateModel(
name="AsyncJob",
fields=[
(
"id",
models.AutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"version",
concurrency.fields.AutoIncVersionField(default=0, help_text="record revision number"),
),
(
"curr_async_result_id",
models.CharField(
blank=True,
editable=False,
help_text="Current (active) AsyncResult is",
max_length=36,
null=True,
),
),
(
"last_async_result_id",
models.CharField(
blank=True,
editable=False,
help_text="Latest executed AsyncResult is",
max_length=36,
null=True,
),
),
(
"datetime_created",
models.DateTimeField(auto_now_add=True, help_text="Creation date and time"),
),
(
"datetime_queued",
models.DateTimeField(
blank=True,
help_text="Queueing date and time",
null=True,
verbose_name="Queued At",
),
),
(
"repeatable",
models.BooleanField(
blank=True,
default=False,
help_text="Indicate if the job can be repeated as-is",
),
),
(
"celery_history",
models.JSONField(blank=True, default=dict, editable=False),
),
(
"local_status",
models.CharField(
blank=True,
default="",
editable=False,
max_length=100,
null=True,
),
),
(
"group_key",
models.CharField(
blank=True,
editable=False,
help_text="Tasks with the same group key will not run in parallel",
max_length=255,
null=True,
),
),
(
"type",
models.CharField(
choices=[
("STANDARD_TASK", "Standard Task"),
("ADMIN_ACTION", "Admin Action"),
("JOB_TASK", "Job Task"),
],
max_length=50,
),
),
("config", models.JSONField(blank=True, default=dict)),
("action", models.CharField(blank=True, max_length=500, null=True)),
(
"description",
models.CharField(blank=True, max_length=255, null=True),
),
("sentry_id", models.CharField(blank=True, max_length=255, null=True)),
(
"instruction",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="jobs",
to="gateway.paymentinstruction",
),
),
(
"owner",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="%(app_label)s_%(class)s_jobs",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"permissions": (("debug_job", "Can debug background jobs"),),
"abstract": False,
},
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 5.1.4 on 2025-01-07 13:06

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("gateway", "0027_asyncjob"),
]

operations = [
migrations.AlterField(
model_name="asyncjob",
name="type",
field=models.CharField(
choices=[("STANDARD_TASK", "Standard Task"), ("JOB_TASK", "Job Task")],
max_length=50,
),
),
]
6 changes: 6 additions & 0 deletions src/hope_payment_gateway/apps/gateway/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.utils.translation import gettext_lazy as _

from adminactions.api import delimiters, quotes
from django_celery_boost.models import AsyncJobModel
from model_utils.models import TimeStampedModel
from strategy_field.fields import StrategyField

Expand Down Expand Up @@ -200,3 +201,8 @@ class Meta:

def __str__(self) -> str:
return f"{self.fsp} / {self.config_key}"


class AsyncJob(AsyncJobModel):
instruction = models.ForeignKey(PaymentInstruction, related_name="jobs", on_delete=models.CASCADE)
celery_task_name = "hope_payment_gateway.apps.gateway.tasks.sync_job_task"
59 changes: 59 additions & 0 deletions src/hope_payment_gateway/apps/gateway/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import contextlib
import logging
from typing import TYPE_CHECKING, Any

from django.core.cache import cache

import sentry_sdk
from redis_lock import Lock

from hope_payment_gateway.apps.gateway.models import AsyncJob
from hope_payment_gateway.config.celery import app

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
from redis_lock.django_cache import RedisCache

cache: RedisCache


@contextlib.contextmanager
def lock_job(job: AsyncJob) -> Lock:
lock = None
if job.group_key:
lock_key = f"lock:{job.group_key}"
# Get a lock with a 60-second lifetime but keep renewing it automatically
# to ensure the lock is held for as long as the Python process is running.
lock = cache.lock(lock_key, 60, auto_renewal=True)
yield lock.__enter__()
else:
yield
if lock:
lock.release()


@app.task()
def sync_job_task(pk: int, version: int) -> dict[str, Any]:
try:
job: AsyncJob = AsyncJob.objects.select_related("owner").get(pk=pk, version=version)
except AsyncJob.DoesNotExist as e:
sentry_sdk.capture_exception(e)
raise e

with lock_job(job):
try:
scope = sentry_sdk.get_current_scope()
if job.owner:
sentry_sdk.set_user = {"id": job.owner.pk, "email": job.owner.email}
return job.execute()
except Exception:
# error is logged in job.execute
raise
finally:
scope.clear()


@app.task()
def removed_expired_jobs(**kwargs):
AsyncJob.objects.filter(**kwargs).delete()

0 comments on commit 612b2ce

Please sign in to comment.