From 49b8134f1d0236199723b9bd0a35e7efb0e8ecee Mon Sep 17 00:00:00 2001
From: Vincent Paturet <vincent.paturet@entur.org>
Date: Wed, 22 Jan 2025 15:37:40 +0100
Subject: [PATCH] Conditional waiting on the previous polling task

---
 .../updater/spi/PollingGraphUpdater.java      | 38 +++++++--
 .../updater/spi/PollingGraphUpdaterTest.java  | 78 -------------------
 2 files changed, 33 insertions(+), 83 deletions(-)
 delete mode 100644 application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java

diff --git a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java
index ebd14528187..2de3121a261 100644
--- a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java
+++ b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java
@@ -3,6 +3,7 @@
 import java.time.Duration;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import org.opentripplanner.framework.application.OTPFeature;
 import org.opentripplanner.updater.GraphWriterRunnable;
 import org.slf4j.Logger;
@@ -42,6 +43,12 @@ public abstract class PollingGraphUpdater implements GraphUpdater {
    */
   private WriteToGraphCallback saveResultOnGraph;
 
+  /**
+   * Handle on the previous polling execution.
+   * Initially null when the polling updater starts.
+   */
+  private Future<?> previousTask;
+
   /** Shared configuration code for all polling graph updaters. */
   protected PollingGraphUpdater(PollingGraphUpdaterParameters config) {
     this.pollingPeriod = config.frequency();
@@ -55,6 +62,10 @@ public Duration pollingPeriod() {
   @Override
   public final void run() {
     try {
+      if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
+        waitForPreviousTask();
+      }
+
       // Run concrete polling graph updater's implementation method.
       runPolling();
       if (runOnlyOnce()) {
@@ -113,11 +124,28 @@ public final void setup(WriteToGraphCallback writeToGraphCallback) {
    */
   protected abstract void runPolling() throws Exception;
 
-  protected final void updateGraph(GraphWriterRunnable task)
-    throws ExecutionException, InterruptedException {
-    var result = saveResultOnGraph.execute(task);
-    if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
-      result.get();
+  /**
+   * Post the update task in the GraphWriter queue.
+   * This is non-blocking.
+   */
+  protected final void updateGraph(GraphWriterRunnable task) {
+    previousTask = saveResultOnGraph.execute(task);
+  }
+
+  /**
+   * If the previous task takes longer than the polling interval,
+   * we delay the next polling cycle until the task is complete.
+   * This prevents tasks from piling up.
+   * */
+  private void waitForPreviousTask() throws InterruptedException, ExecutionException {
+    if (previousTask != null && !previousTask.isDone()) {
+      LOG.info("Delaying polling until the previous task is complete");
+      long startBlockingWait = System.currentTimeMillis();
+      previousTask.get();
+      LOG.info(
+        "Resuming polling after waiting an additional {}s",
+        (System.currentTimeMillis() - startBlockingWait) / 1000
+      );
     }
   }
 }
diff --git a/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java b/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java
deleted file mode 100644
index 6132988d92b..00000000000
--- a/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package org.opentripplanner.updater.spi;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import java.time.Duration;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.opentripplanner.framework.application.OTPFeature;
-import org.opentripplanner.updater.GraphWriterRunnable;
-
-public class PollingGraphUpdaterTest {
-
-  private static final PollingGraphUpdaterParameters config = new PollingGraphUpdaterParameters() {
-    @Override
-    public Duration frequency() {
-      return Duration.ZERO;
-    }
-
-    @Override
-    public String configRef() {
-      return "";
-    }
-  };
-
-  private static final PollingGraphUpdater subject = new PollingGraphUpdater(config) {
-    @Override
-    protected void runPolling() {}
-  };
-
-  private boolean updateCompleted;
-
-  @BeforeAll
-  static void beforeAll() {
-    subject.setup(runnable -> CompletableFuture.runAsync(() -> runnable.run(null)));
-  }
-
-  @BeforeEach
-  void setUp() {
-    updateCompleted = false;
-  }
-
-  private final GraphWriterRunnable graphWriterRunnable = context -> {
-    try {
-      Thread.sleep(100);
-      updateCompleted = true;
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  };
-
-  @Test
-  void testUpdateGraphWithWaitFeatureOn() {
-    OTPFeature.WaitForGraphUpdateInPollingUpdaters.testOn(() -> {
-      callUpdater();
-      assertTrue(updateCompleted);
-    });
-  }
-
-  @Test
-  void testProcessGraphUpdaterResultWithWaitFeatureOff() {
-    OTPFeature.WaitForGraphUpdateInPollingUpdaters.testOff(() -> {
-      callUpdater();
-      assertFalse(updateCompleted);
-    });
-  }
-
-  private void callUpdater() {
-    try {
-      subject.updateGraph(graphWriterRunnable);
-    } catch (ExecutionException | InterruptedException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}