From cf8acfee1555e86d83d8dce98a6370a96a5297c8 Mon Sep 17 00:00:00 2001 From: Vincent Paturet Date: Wed, 22 Jan 2025 15:37:40 +0100 Subject: [PATCH] Conditional waiting on the previous polling task --- .../updater/spi/PollingGraphUpdater.java | 38 +++++++-- .../updater/spi/PollingGraphUpdaterTest.java | 78 ------------------- 2 files changed, 33 insertions(+), 83 deletions(-) delete mode 100644 application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java diff --git a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java index e8faa4b6aba..853e02a302f 100644 --- a/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java +++ b/application/src/main/java/org/opentripplanner/updater/spi/PollingGraphUpdater.java @@ -3,6 +3,7 @@ import java.time.Duration; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.opentripplanner.framework.application.OTPFeature; import org.opentripplanner.updater.GraphWriterRunnable; import org.slf4j.Logger; @@ -42,6 +43,12 @@ public abstract class PollingGraphUpdater implements GraphUpdater { */ protected WriteToGraphCallback saveResultOnGraph; + /** + * Handle on the previous polling execution. + * Initially null when the polling updater starts. + */ + private Future previousTask; + /** Shared configuration code for all polling graph updaters. */ protected PollingGraphUpdater(PollingGraphUpdaterParameters config) { this.pollingPeriod = config.frequency(); @@ -55,6 +62,10 @@ public Duration pollingPeriod() { @Override public final void run() { try { + if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) { + waitForPreviousTask(); + } + // Run concrete polling graph updater's implementation method. runPolling(); if (runOnlyOnce()) { @@ -113,11 +124,28 @@ public final void setup(WriteToGraphCallback writeToGraphCallback) { */ protected abstract void runPolling() throws Exception; - protected final void updateGraph(GraphWriterRunnable task) - throws ExecutionException, InterruptedException { - var result = saveResultOnGraph.execute(task); - if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) { - result.get(); + /** + * Post the update task in the GraphWriter queue. + * This is non-blocking. + */ + protected final void updateGraph(GraphWriterRunnable task) { + previousTask = saveResultOnGraph.execute(task); + } + + /** + * If the previous task takes longer than the polling interval, + * we delay the next polling cycle until the task is complete. + * This prevents tasks from piling up. + * */ + private void waitForPreviousTask() throws InterruptedException, ExecutionException { + if (previousTask != null && !previousTask.isDone()) { + LOG.info("Delaying polling until the previous task is complete"); + long startBlockingWait = System.currentTimeMillis(); + previousTask.get(); + LOG.info( + "Resuming polling after waiting an additional {}s", + (System.currentTimeMillis() - startBlockingWait) / 1000 + ); } } } diff --git a/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java b/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java deleted file mode 100644 index 6132988d92b..00000000000 --- a/application/src/test/java/org/opentripplanner/updater/spi/PollingGraphUpdaterTest.java +++ /dev/null @@ -1,78 +0,0 @@ -package org.opentripplanner.updater.spi; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.time.Duration; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.opentripplanner.framework.application.OTPFeature; -import org.opentripplanner.updater.GraphWriterRunnable; - -public class PollingGraphUpdaterTest { - - private static final PollingGraphUpdaterParameters config = new PollingGraphUpdaterParameters() { - @Override - public Duration frequency() { - return Duration.ZERO; - } - - @Override - public String configRef() { - return ""; - } - }; - - private static final PollingGraphUpdater subject = new PollingGraphUpdater(config) { - @Override - protected void runPolling() {} - }; - - private boolean updateCompleted; - - @BeforeAll - static void beforeAll() { - subject.setup(runnable -> CompletableFuture.runAsync(() -> runnable.run(null))); - } - - @BeforeEach - void setUp() { - updateCompleted = false; - } - - private final GraphWriterRunnable graphWriterRunnable = context -> { - try { - Thread.sleep(100); - updateCompleted = true; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }; - - @Test - void testUpdateGraphWithWaitFeatureOn() { - OTPFeature.WaitForGraphUpdateInPollingUpdaters.testOn(() -> { - callUpdater(); - assertTrue(updateCompleted); - }); - } - - @Test - void testProcessGraphUpdaterResultWithWaitFeatureOff() { - OTPFeature.WaitForGraphUpdateInPollingUpdaters.testOff(() -> { - callUpdater(); - assertFalse(updateCompleted); - }); - } - - private void callUpdater() { - try { - subject.updateGraph(graphWriterRunnable); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - } -}