Skip to content

Commit

Permalink
Show CLI messaged when a connection to the platform is lost/restored
Browse files Browse the repository at this point in the history
  • Loading branch information
matt-aitken committed Feb 7, 2025
1 parent 6088c27 commit 8ec03c5
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
13 changes: 13 additions & 0 deletions packages/cli-v3/src/dev/devOutput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from "../utilities/cliOutput.js";
import { eventBus, EventBusEventArgs } from "../utilities/eventBus.js";
import { logger } from "../utilities/logger.js";
import { Socket } from "socket.io-client";

export type DevOutputOptions = {
name: string | undefined;
Expand Down Expand Up @@ -173,13 +174,23 @@ export function startDevOutput(options: DevOutputOptions) {
);
};

const socketConnectionDisconnected = (reason: Socket.DisconnectReason) => {
logger.log(chalkGrey(`○ Connection was lost: ${reason}`));
};

const socketConnectionReconnected = (reason: string) => {
logger.log(chalkGrey(`○ Connection was restored`));
};

eventBus.on("rebuildStarted", rebuildStarted);
eventBus.on("buildStarted", buildStarted);
eventBus.on("workerSkipped", workerSkipped);
eventBus.on("backgroundWorkerInitialized", backgroundWorkerInitialized);
eventBus.on("runStarted", runStarted);
eventBus.on("runCompleted", runCompleted);
eventBus.on("backgroundWorkerIndexingError", backgroundWorkerIndexingError);
eventBus.on("socketConnectionDisconnected", socketConnectionDisconnected);
eventBus.on("socketConnectionReconnected", socketConnectionReconnected);

return () => {
eventBus.off("rebuildStarted", rebuildStarted);
Expand All @@ -189,6 +200,8 @@ export function startDevOutput(options: DevOutputOptions) {
eventBus.off("runStarted", runStarted);
eventBus.off("runCompleted", runCompleted);
eventBus.off("backgroundWorkerIndexingError", backgroundWorkerIndexingError);
eventBus.off("socketConnectionDisconnected", socketConnectionDisconnected);
eventBus.off("socketConnectionReconnected", socketConnectionReconnected);
};
}

Expand Down
33 changes: 16 additions & 17 deletions packages/cli-v3/src/dev/devSupervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
WorkerClientToServerEvents,
WorkerServerToClientEvents,
} from "@trigger.dev/core/v3/workers";
import { debounce } from "@trigger.dev/core/debounce";

export type WorkerRuntimeOptions = {
name: string | undefined;
Expand Down Expand Up @@ -56,6 +55,7 @@ class DevSupervisor implements WorkerRuntime {

/** Receive notifications when runs change state */
private socket: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
private socketIsReconnecting = false;

/** Workers are versions of the code */
private workers: Map<string, BackgroundWorker> = new Map();
Expand All @@ -65,8 +65,6 @@ class DevSupervisor implements WorkerRuntime {

private socketConnections = new Set<string>();

private cancelAfterDelay = debounce(this.#cancelAllRunsIfDisconnected, 5_000);

constructor(public readonly options: WorkerRuntimeOptions) {}

async init(): Promise<void> {
Expand Down Expand Up @@ -192,7 +190,7 @@ class DevSupervisor implements WorkerRuntime {
});

if (!result.success) {
logger.error(`[DevSupervisor] dequeueRuns. Failed to dequeue runs`, {
logger.debug(`[DevSupervisor] dequeueRuns. Failed to dequeue runs`, {
error: result.error,
});
setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithoutRun);
Expand All @@ -218,7 +216,7 @@ class DevSupervisor implements WorkerRuntime {
const worker = this.workers.get(message.backgroundWorker.friendlyId);

if (!worker) {
logger.error(
logger.debug(
`[DevSupervisor] dequeueRuns. Dequeued a run but there's no BackgroundWorker so we can't execute it`,
{
run: message.run.friendlyId,
Expand All @@ -232,7 +230,7 @@ class DevSupervisor implements WorkerRuntime {

let runController = this.runControllers.get(message.run.friendlyId);
if (runController) {
logger.error(
logger.debug(
`[DevSupervisor] dequeueRuns. Dequeuing a run that already has a runController`,
{
runController: message.run.friendlyId,
Expand Down Expand Up @@ -297,7 +295,7 @@ class DevSupervisor implements WorkerRuntime {

setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithRun);
} catch (error) {
logger.error(`[DevSupervisor] dequeueRuns. Error thrown`, { error });
logger.debug(`[DevSupervisor] dequeueRuns. Error thrown`, { error });
//dequeue again
setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithoutRun);
}
Expand All @@ -314,12 +312,12 @@ class DevSupervisor implements WorkerRuntime {

// Connection was lost and successfully reconnected
eventSource.addEventListener("reconnect", (event: any) => {
logger.info("[DevSupervisor] Presence connection restored");
logger.debug("[DevSupervisor] Presence connection restored");
});

// Handle messages that might have been missed during disconnection
eventSource.addEventListener("missed_events", (event: any) => {
logger.warn("[DevSupervisor] Missed some presence events during disconnection");
logger.debug("[DevSupervisor] Missed some presence events during disconnection");
});

// If you need to close it manually
Expand Down Expand Up @@ -409,6 +407,13 @@ class DevSupervisor implements WorkerRuntime {
this.socket.on("connect", () => {
logger.debug("[DevSupervisor] Connected to supervisor");

if (this.socket.recovered || this.socketIsReconnecting) {
logger.debug("[DevSupervisor] Socket recovered");
eventBus.emit("socketConnectionReconnected", `Connection was recovered`);
}

this.socketIsReconnecting = false;

for (const controller of this.runControllers.values()) {
controller.resubscribeToRunNotifications();
}
Expand All @@ -427,7 +432,8 @@ class DevSupervisor implements WorkerRuntime {
// the disconnection was initiated by the server, you need to manually reconnect
this.socket.connect();
} else {
// the disconnection was a network error
this.socketIsReconnecting = true;
eventBus.emit("socketConnectionDisconnected", reason);
}
});

Expand Down Expand Up @@ -530,13 +536,6 @@ class DevSupervisor implements WorkerRuntime {
worker.stop();
this.workers.delete(friendlyId);
}

/** Debounce cancelling all runs because the socket has been disconnected for a while */
async #cancelAllRunsIfDisconnected() {
const runFriendlyIds = this.runControllers.keys();

//todo actually cancel them
}
}

function gatherProcessEnv() {
Expand Down
3 changes: 3 additions & 0 deletions packages/cli-v3/src/utilities/eventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
} from "@trigger.dev/core/v3";
import { EventEmitter } from "node:events";
import { BackgroundWorker } from "../dev/backgroundWorker.js";
import { Socket } from "socket.io-client";

export type EventBusEvents = {
rebuildStarted: [BuildTarget];
Expand All @@ -15,6 +16,8 @@ export type EventBusEvents = {
backgroundWorkerIndexingError: [BuildManifest, Error];
runStarted: [BackgroundWorker, TaskRunExecution];
runCompleted: [BackgroundWorker, TaskRunExecution, TaskRunExecutionResult, number];
socketConnectionDisconnected: [Socket.DisconnectReason];
socketConnectionReconnected: [string];
};

export type EventBusEventArgs<T extends keyof EventBusEvents> = EventBusEvents[T];
Expand Down

0 comments on commit 8ec03c5

Please sign in to comment.