Skip to content

Commit

Permalink
Add support for launching engine images
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobwgillespie committed Dec 2, 2023
1 parent 141aa41 commit f80e55b
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 1 deletion.
11 changes: 11 additions & 0 deletions proto/depot/cloud/v3/machine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ message RegisterMachineResponse {
oneof task {
PendingTask pending = 3;
BuildKitTask buildkit = 4;
EngineTask engine = 5;
}

message Mount {
Expand Down Expand Up @@ -86,6 +87,16 @@ message RegisterMachineResponse {
optional bool enable_context_logging = 17;
}

// EngineTask represents an instruction to start an engine daemon
message EngineTask {
string image = 1;
string server_name = 2;
Cert cert = 3;
Cert ca_cert = 4;
repeated Mount mounts = 5;
int32 cache_size = 6;
}

// Specifies sending buildkit profiling data to a remote endpoint.
message Profiler {
string endpoint = 1;
Expand Down
2 changes: 1 addition & 1 deletion src/gen/ts/depot/cloud/v3/machine_connect.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// @generated by protoc-gen-connect-es v0.12.0 with parameter "target=ts,import_extension=none"
// @generated by protoc-gen-connect-es v0.13.0 with parameter "target=ts,import_extension=none"
// @generated from file depot/cloud/v3/machine.proto (package depot.cloud.v3, syntax proto3)
/* eslint-disable */
// @ts-nocheck
Expand Down
80 changes: 80 additions & 0 deletions src/gen/ts/depot/cloud/v3/machine_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ export class RegisterMachineResponse extends Message<RegisterMachineResponse> {
value: RegisterMachineResponse_BuildKitTask
case: 'buildkit'
}
| {
/**
* @generated from field: depot.cloud.v3.RegisterMachineResponse.EngineTask engine = 5;
*/
value: RegisterMachineResponse_EngineTask
case: 'engine'
}
| {case: undefined; value?: undefined} = {case: undefined}

constructor(data?: PartialMessage<RegisterMachineResponse>) {
Expand All @@ -162,6 +169,7 @@ export class RegisterMachineResponse extends Message<RegisterMachineResponse> {
{no: 2, name: 'token', kind: 'scalar', T: 9 /* ScalarType.STRING */},
{no: 3, name: 'pending', kind: 'message', T: RegisterMachineResponse_PendingTask, oneof: 'task'},
{no: 4, name: 'buildkit', kind: 'message', T: RegisterMachineResponse_BuildKitTask, oneof: 'task'},
{no: 5, name: 'engine', kind: 'message', T: RegisterMachineResponse_EngineTask, oneof: 'task'},
])

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RegisterMachineResponse {
Expand Down Expand Up @@ -533,6 +541,78 @@ export class RegisterMachineResponse_BuildKitTask extends Message<RegisterMachin
}
}

/**
* EngineTask represents an instruction to start an engine daemon
*
* @generated from message depot.cloud.v3.RegisterMachineResponse.EngineTask
*/
export class RegisterMachineResponse_EngineTask extends Message<RegisterMachineResponse_EngineTask> {
/**
* @generated from field: string image = 1;
*/
image = ''

/**
* @generated from field: string server_name = 2;
*/
serverName = ''

/**
* @generated from field: depot.cloud.v3.Cert cert = 3;
*/
cert?: Cert

/**
* @generated from field: depot.cloud.v3.Cert ca_cert = 4;
*/
caCert?: Cert

/**
* @generated from field: repeated depot.cloud.v3.RegisterMachineResponse.Mount mounts = 5;
*/
mounts: RegisterMachineResponse_Mount[] = []

/**
* @generated from field: int32 cache_size = 6;
*/
cacheSize = 0

constructor(data?: PartialMessage<RegisterMachineResponse_EngineTask>) {
super()
proto3.util.initPartial(data, this)
}

static readonly runtime: typeof proto3 = proto3
static readonly typeName = 'depot.cloud.v3.RegisterMachineResponse.EngineTask'
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{no: 1, name: 'image', kind: 'scalar', T: 9 /* ScalarType.STRING */},
{no: 2, name: 'server_name', kind: 'scalar', T: 9 /* ScalarType.STRING */},
{no: 3, name: 'cert', kind: 'message', T: Cert},
{no: 4, name: 'ca_cert', kind: 'message', T: Cert},
{no: 5, name: 'mounts', kind: 'message', T: RegisterMachineResponse_Mount, repeated: true},
{no: 6, name: 'cache_size', kind: 'scalar', T: 5 /* ScalarType.INT32 */},
])

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RegisterMachineResponse_EngineTask {
return new RegisterMachineResponse_EngineTask().fromBinary(bytes, options)
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): RegisterMachineResponse_EngineTask {
return new RegisterMachineResponse_EngineTask().fromJson(jsonValue, options)
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): RegisterMachineResponse_EngineTask {
return new RegisterMachineResponse_EngineTask().fromJsonString(jsonString, options)
}

static equals(
a: RegisterMachineResponse_EngineTask | PlainMessage<RegisterMachineResponse_EngineTask> | undefined,
b: RegisterMachineResponse_EngineTask | PlainMessage<RegisterMachineResponse_EngineTask> | undefined,
): boolean {
return proto3.util.equals(RegisterMachineResponse_EngineTask, a, b)
}
}

/**
* Specifies sending buildkit profiling data to a remote endpoint.
*
Expand Down
118 changes: 118 additions & 0 deletions src/tasks/engine.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import {isAbortError} from 'abort-controller-x'
import {execa} from 'execa'
import * as fsp from 'fs/promises'
import {onShutdown, onShutdownError} from 'node-graceful-shutdown'
import {RegisterMachineResponse, RegisterMachineResponse_EngineTask} from '../gen/ts/depot/cloud/v3/machine_pb'
import {ensureMounted, unmapBlockDevice, unmountDevice} from '../utils/mounts'
import {reportEngineHealth} from './engineHealth'

export async function startEngine(message: RegisterMachineResponse, task: RegisterMachineResponse_EngineTask) {
console.log('Starting engine')

let useCeph = false
for (const mount of task.mounts) {
await ensureMounted(mount.device, mount.path, mount.fsType, mount.cephVolume, mount.options)
if (mount.cephVolume) useCeph = true
}

const {machineId, token} = message
const headers = {Authorization: `Bearer ${token}`}

await fsp.writeFile('/etc/engine/tls.crt', task.cert!.cert, {mode: 0o644})
await fsp.writeFile('/etc/engine/tls.key', task.cert!.key, {mode: 0o644})
await fsp.writeFile('/etc/engine/tlsca.crt', task.caCert!.cert, {mode: 0o644})

const cacheSizeMB = task.cacheSize * 1000000

const args = [
'run',
'--rm',
'--privileged',
'--name',
'engine',
'-v',
'/etc/engine:/etc/engine:ro',
'-v',
'/var/lib/engine:/var/lib/engine',
'-p',
'443:443',
task.image,
'--addr',
'tcp://0.0.0.0:443',
'--root',
'/var/lib/engine',
'--tlscert',
'/etc/engine/tls.crt',
'--tlskey',
'/etc/engine/tls.key',
'--tlscacert',
'/etc/engine/tlsca.crt',
'--oci-worker-gc',
'--oci-worker-gc-keepstorage',
`${cacheSizeMB}`,
'--oci-max-parallelism',
'num-cpu',
]

const controller = new AbortController()
const signal = controller.signal

async function runEngine() {
try {
console.log('Execing engine')
await execa('/usr/bin/docker', args, {stdio: 'inherit', signal})
} catch (error) {
if (error instanceof Error && error.message.includes('Command failed with exit code 1')) {
// Ignore this error, it's expected when the process is killed.
} else if (isAbortError(error)) {
// Ignore this error, it's expected when the process is killed.
} else {
throw error
}
} finally {
controller.abort()
}
}

const engine = runEngine()

onShutdownError(async (error) => {
console.error('Error shutting down:', error)
})

onShutdown(async () => {
setTimeout(() => {
console.log('Shutdown timed out, killing process')
process.exit(1)
}, 1000 * 60).unref()

controller.abort()
try {
await engine
console.log('Engine exited')
} catch (error) {
console.log(`Engine exited with error: ${error}`)
}

for (const mount of task.mounts) {
if (mount.cephVolume) {
await unmountDevice(mount.path)
await unmapBlockDevice(mount.cephVolume.volumeName)
} else {
await unmountDevice(mount.path)
}
}
})

try {
await Promise.all([
engine,
reportEngineHealth({machineId, signal, headers, mounts: task.mounts}),
// reportUsage({machineId, signal, headers}),
])
} catch (error) {
throw error
} finally {
controller.abort()
}
}
113 changes: 113 additions & 0 deletions src/tasks/engineHealth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import {PlainMessage} from '@bufbuild/protobuf'
import {execa} from 'execa'
import {DiskSpace} from '../gen/ts/depot/cloud/v3/machine_pb'
import {sleep} from '../utils/common'
import {DiskStats, stats} from '../utils/disk'
import {client} from '../utils/grpc'

export interface ReportHealthParams {
machineId: string
signal: AbortSignal
headers: HeadersInit
mounts: Mount[]
}

export interface Mount {
device: string
path: string
}

export async function reportEngineHealth({machineId, signal, headers, mounts}: ReportHealthParams) {
while (true) {
if (signal.aborted) return

await waitForWorkers(signal)

try {
while (true) {
if (signal.aborted) return

const disk_stats = await Promise.all(mounts.map(({device, path}) => stats(device, path)))
const disks: PlainMessage<DiskSpace>[] = disk_stats
.filter((item: DiskStats | undefined): item is DiskStats => {
return item !== undefined
})
.map(({device, path, freeMb, totalMb, freeInodes, totalInodes}) => {
return {
device,
path,
freeMb,
totalMb,
freeInodes,
totalInodes,
}
})

await client.pingMachineHealth({machineId, disks}, {headers, signal})
await sleep(1000)
}
} catch (error) {
console.log('Error reporting health:', error)
}
await sleep(1000)
}
}

export async function waitForWorkers(signal: AbortSignal) {
let workers: EngineWorker[] = await listEngineWorkers()
while (!signal.aborted && workers.length === 0) {
console.log('Waiting for engine workers to start')
await sleep(250)
workers = await listEngineWorkers()
}
}

interface EngineWorker {
id: string
labels: Record<string, string>
platforms: {architecture: string; os: string; variant?: string}[]
gcPolicy: {
filter: string[] | null
all: boolean
keepDuration: number
keepBytes: number
}[]
buildkitVersion: {
package: string
version: string
revision: string
}
}

async function listEngineWorkers(): Promise<EngineWorker[]> {
try {
const res = await execa(
'buildctl',
[
'--tlsservername',
'localhost',
'--tlscert',
'/etc/engine/tls.crt',
'--tlskey',
'/etc/engine/tls.key',
'--tlscacert',
'/etc/engine/tlsca.crt',
'--timeout',
'1',
'--addr',
'tcp://localhost:443',
'debug',
'workers',
'--format',
'{{json .}}',
],
{reject: false},
)
if (res.exitCode !== 0) return []
const workers = JSON.parse(res.stdout)
return workers
} catch (err) {
console.log('Error listing engine workers', err)
return []
}
}

0 comments on commit f80e55b

Please sign in to comment.