diff --git a/redash/tasks/queries/execution.py b/redash/tasks/queries/execution.py index a863903cdb..a545e29cba 100644 --- a/redash/tasks/queries/execution.py +++ b/redash/tasks/queries/execution.py @@ -15,6 +15,7 @@ from redash.tasks.failure_report import track_failure from redash.tasks.worker import Job, Queue from redash.utils import gen_query_hash, utcnow +from redash.utils.locks import acquire_lock, release_lock from redash.worker import get_job_logger logger = get_job_logger(__name__) @@ -34,14 +35,18 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query logger.info("Inserting job for %s with metadata=%s", query_hash, metadata) try_count = 0 job = None + job_lock_id = _job_lock_id(query_hash, data_source.id) while try_count < 5: try_count += 1 + identifier = acquire_lock(job_lock_id) + if identifier is None: + continue pipe = redis_connection.pipeline() try: - pipe.watch(_job_lock_id(query_hash, data_source.id)) - job_id = pipe.get(_job_lock_id(query_hash, data_source.id)) + pipe.watch(job_lock_id) + job_id = pipe.get(job_lock_id) if job_id: logger.info("[%s] Found existing job: %s", query_hash, job_id) job_complete = None @@ -66,7 +71,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query if lock_is_irrelevant: logger.info("[%s] %s, removing lock", query_hash, message) - redis_connection.delete(_job_lock_id(query_hash, data_source.id)) + redis_connection.delete(job_lock_id) job = None if not job: @@ -115,6 +120,7 @@ def enqueue_query(query, data_source, user_id, is_api_key=False, scheduled_query except redis.WatchError: continue finally: + release_lock(job_lock_id, identifier) pipe.reset() if not job: diff --git a/redash/utils/locks.py b/redash/utils/locks.py new file mode 100644 index 0000000000..126496096f --- /dev/null +++ b/redash/utils/locks.py @@ -0,0 +1,59 @@ +import random +import time +import uuid +import logging +from redash import redis_connection +from redis import WatchError + +logger = logging.getLogger(__name__) + + +def acquire_lock(name, acquire_timeout=10, lock_timeout=5): + identifier = str(uuid.uuid4()) + lock_name = f"lock:{name}" + end = time.time() + acquire_timeout + + base_delay = 0.001 + max_delay = 0.05 + + while time.time() < end: + if redis_connection.set(lock_name, identifier, ex=lock_timeout, nx=True): + logger.info("acquire_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier) + return identifier + + delay = base_delay + random.uniform(0, base_delay) + time.sleep(min(delay, max_delay)) + base_delay = min(base_delay * 2, max_delay) + + return None + + +def release_lock(name, identifier): + lock_name = f"lock:{name}" + logger.info("release_lock, lock_name=[%s], identifier=[%s]", lock_name, identifier) + with redis_connection.pipeline() as pipe: + while True: + try: + pipe.watch(lock_name) + if pipe.get(lock_name) == identifier: + pipe.multi() + pipe.delete(lock_name) + pipe.execute() + logger.info("Lock released successfully, lock_name=[%s], identifier=[%s]", lock_name, identifier) + return True + pipe.unwatch() + logger.warning( + "Lock not owned by this identifier, lock_name=[%s], identifier=[%s]", lock_name, identifier + ) + break + except WatchError: + logger.warning( + "WatchError occurred, retrying lock release, lock_name=[%s], identifier=[%s]", + lock_name, + identifier, + ) + except Exception as e: + logger.error("Error releasing lock: %s", str(e)) + break + + return False