From af496fe5e3d49c873555bcb27ad4a85e47e35c11 Mon Sep 17 00:00:00 2001 From: guyu Date: Fri, 24 Jan 2025 11:41:18 +0800 Subject: [PATCH 1/4] for same query_text refresh just execution once --- redash/tasks/queries/execution.py | 12 +++++-- redash/utils/locks.py | 53 +++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 redash/utils/locks.py diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index a863903cdb..a545e29cba 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -15,6 +15,7 @@ from redash.tasks.failure_report import track_failure from redash.tasks.worker import Job, Queue from redash.utils import gen_query_hash, utcnow +from redash.utils.locks import acquire_lock, release_lock from redash.worker import get_job_logger logger = get_job_logger(__name__) @@ -34,14 +35,18 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query logger.info("Inserting job for %s with metadata=%s", query_hash, metadata) try_count = 0 job = None + job_lock_id = _job_lock_id(query_hash, data_source.id) while try_count < 5: try_count += 1 + identifier = acquire_lock(job_lock_id) + if identifier is None: + continue pipe = redis_connection.pipeline() try: - pipe.watch(_job_lock_id(query_hash, data_source.id)) - job_id = pipe.get(_job_lock_id(query_hash, data_source.id)) + pipe.watch(job_lock_id) + job_id = pipe.get(job_lock_id) if job_id: logger.info("[%s] Found existing job: %s", query_hash, job_id) job_complete = None @@ -66,7 +71,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query if lock_is_irrelevant: logger.info("[%s] %s, removing lock", query_hash, message) - redis_connection.delete(_job_lock_id(query_hash, data_source.id)) + redis_connection.delete(job_lock_id) job = None if not job: @@ -115,6 +120,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query except redis.WatchError: continue finally: + release_lock(job_lock_id, identifier) pipe.reset() if not job: diff --git a/redash/utils/locks.py b/redash/utils/locks.py new file mode 100644 index 0000000000..ca1324a87e --- /dev/null +++ b/redash/utils/locks.py @@ -0,0 +1,53 @@ +import random +import time +import uuid +import logging +from redis import WatchError +from redash import redis_connection + +logger = logging.getLogger(__name__) + + +def acquire_lock(name, acquire_timeout=10, lock_timeout=5): + identifier = str(uuid.uuid4()) + lock_name = f"lock:{name}" + end = time.time() + acquire_timeout + + base_delay = 0.001 + max_delay = 0.05 + + while time.time() < end: + if redis_connection.set(lock_name, identifier, ex=lock_timeout, nx=True): + logger.info("acquire_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier) + return identifier + + delay = base_delay + random.uniform(0, base_delay) + time.sleep(min(delay, max_delay)) + base_delay = min(base_delay * 2, max_delay) + + return None + + +def release_lock(name, identifier): + lock_name = f"lock:{name}" + logger.info("release_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier) + with redis_connection.pipeline() as pipe: + while True: + try: + pipe.watch(lock_name) + if pipe.get(lock_name) == identifier: + pipe.multi() + pipe.delete(lock_name) + pipe.execute() + logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier) + return True + pipe.unwatch() + logger.warning("Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier) + break + except WatchError: + logger.warning("WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", lock_name, identifier) + except Exception as e: + logger.error("Error releasing lock: %s", str(e)) + break + + return False From f841b217e8173e01062e3bb13ac5ce5b60c7c655 Mon Sep 17 00:00:00 2001 From: guyu Date: Fri, 24 Jan 2025 11:46:17 +0800 Subject: [PATCH 2/4] format --- redash/utils/locks.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/redash/utils/locks.py b/redash/utils/locks.py index ca1324a87e..4660d8cdae 100644 --- a/redash/utils/locks.py +++ b/redash/utils/locks.py @@ -2,8 +2,8 @@ import time import uuid import logging -from redis import WatchError from redash import redis_connection +from redis import WatchError logger = logging.getLogger(__name__) @@ -42,10 +42,12 @@ def release_lock(name, identifier): logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier) return True pipe.unwatch() - logger.warning("Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier) + logger.warning("Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, + identifier) break except WatchError: - logger.warning("WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", lock_name, identifier) + logger.warning("WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", lock_name, + identifier) except Exception as e: logger.error("Error releasing lock: %s", str(e)) break From 06c9a2b21a7d1ffca0c67a564e23caaf72a8221d Mon Sep 17 00:00:00 2001 From: guyu Date: Fri, 24 Jan 2025 11:48:29 +0800 Subject: [PATCH 3/4] fix --- redash/utils/locks.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/redash/utils/locks.py b/redash/utils/locks.py index 4660d8cdae..126496096f 100644 --- a/redash/utils/locks.py +++ b/redash/utils/locks.py @@ -42,12 +42,16 @@ def release_lock(name, identifier): logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier) return True pipe.unwatch() - logger.warning("Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, - identifier) + logger.warning( + "Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier + ) break except WatchError: - logger.warning("WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", lock_name, - identifier) + logger.warning( + "WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", + lock_name, + identifier, + ) except Exception as e: logger.error("Error releasing lock: %s", str(e)) break From 5cfa6bc217cee45b6d1fd8671efbaa735a6c8478 Mon Sep 17 00:00:00 2001 From: Arik Fraimovich Date: Fri, 31 Jan 2025 10:29:54 +0200 Subject: [PATCH 4/4] Update ci.yml to match latest master --- .github/workflows/ci.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 520f5c0a60..1cee14f8ab 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,7 +3,7 @@ on: push: branches: - master - pull_request_target: + pull_request: branches: - master env: @@ -60,10 +60,10 @@ jobs: mkdir -p /tmp/test-results/unit-tests docker cp tests:/app/coverage.xml ./coverage.xml docker cp tests:/app/junit.xml /tmp/test-results/unit-tests/results.xml - - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v3 - with: - token: ${{ secrets.CODECOV_TOKEN }} + # - name: Upload coverage reports to Codecov + # uses: codecov/codecov-action@v3 + # with: + # token: ${{ secrets.CODECOV_TOKEN }} - name: Store Test Results uses: actions/upload-artifact@v4 with: @@ -134,9 +134,9 @@ jobs: COMPOSE_PROJECT_NAME: cypress CYPRESS_INSTALL_BINARY: 0 PUPPETEER_SKIP_CHROMIUM_DOWNLOAD: 1 - PERCY_TOKEN: ${{ secrets.PERCY_TOKEN }} - CYPRESS_PROJECT_ID: ${{ secrets.CYPRESS_PROJECT_ID }} - CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }} + # PERCY_TOKEN: ${{ secrets.PERCY_TOKEN }} + # CYPRESS_PROJECT_ID: ${{ secrets.CYPRESS_PROJECT_ID }} + # CYPRESS_RECORD_KEY: ${{ secrets.CYPRESS_RECORD_KEY }} steps: - if: github.event.pull_request.mergeable == 'false' name: Exit if PR is not mergeable