Skip to content

Commit

Permalink
Forward tracing and parse user tracing headers (#845)
Browse files Browse the repository at this point in the history
## Summary
<!-- Succinctly describe your change, providing context, what you've
changed, and why. -->



## Checklist
<!-- Tick these items off as you progress. -->
<!-- If an item isn't applicable, ideally please strikeout the item by
wrapping it in "~~"" and suffix it with "N/A My reason for skipping
this." -->
<!-- e.g. "- [ ] ~~Added tests~~ N/A Only touches docs" -->

- [x] Added a [docs PR](https://github.com/inngest/website) that
references this PR
- [x] Added unit/integration tests
- [x] Added changesets if applicable

## Related
<!-- A space for any related links, issues, or PRs. -->
<!-- Linear issues are autolinked. -->
<!-- e.g. - INN-123 -->
<!-- GitHub issues/PRs can be linked using shorthand. -->
<!-- e.g. "- inngest/inngest#123" -->
<!-- Feel free to remove this section if there are no applicable related
links.-->
- INN-

---------

Co-authored-by: Aaron Harper <[email protected]>
  • Loading branch information
BrunoScheufler and amh4r authored Feb 17, 2025
1 parent 4ba95be commit a2aadb1
Show file tree
Hide file tree
Showing 5 changed files with 528 additions and 658 deletions.
5 changes: 5 additions & 0 deletions .changeset/many-stingrays-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"inngest": patch
---

- Connect: Forward tracing and parse user tracing headers
13 changes: 12 additions & 1 deletion packages/inngest/src/components/connect/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
ReconnectError,
ConnectionLimitError,
waitWithCancel,
parseTraceCtx,
} from "./util.js";

const ResponseAcknowlegeDeadline = 5_000;
Expand Down Expand Up @@ -288,6 +289,8 @@ class WebSocketWorkerConnection implements WorkerConnection {
const asString = new TextDecoder().decode(msg.requestPayload);
const parsed = parseFnData(JSON.parse(asString));

const userTraceCtx = parseTraceCtx(msg.userTraceCtx);

return {
body() {
return parsed;
Expand All @@ -307,8 +310,9 @@ class WebSocketWorkerConnection implements WorkerConnection {
// Note: Signature is disabled for connect
return null;
case headerKeys.TraceParent.toString():
return userTraceCtx?.traceParent ?? null;
case headerKeys.TraceState.toString():
return null;
return userTraceCtx?.traceState ?? null;
default:
return null;
}
Expand All @@ -329,6 +333,7 @@ class WebSocketWorkerConnection implements WorkerConnection {

return SDKResponse.create({
requestId: msg.requestId,
accountId: msg.accountId,
envId: msg.envId,
appId: msg.appId,
status: sdkResponseStatus,
Expand All @@ -341,6 +346,8 @@ class WebSocketWorkerConnection implements WorkerConnection {
PREFERRED_EXECUTION_VERSION.toString(),
10
),
systemTraceCtx: msg.systemTraceCtx,
userTraceCtx: msg.userTraceCtx,
});
},
url() {
Expand Down Expand Up @@ -875,10 +882,14 @@ class WebSocketWorkerConnection implements WorkerConnection {
kind: GatewayMessageType.WORKER_REQUEST_ACK,
payload: WorkerRequestAckData.encode(
WorkerRequestAckData.create({
accountId: gatewayExecutorRequest.accountId,
envId: gatewayExecutorRequest.envId,
appId: gatewayExecutorRequest.appId,
functionSlug: gatewayExecutorRequest.functionSlug,
requestId: gatewayExecutorRequest.requestId,
stepId: gatewayExecutorRequest.stepId,
userTraceCtx: gatewayExecutorRequest.userTraceCtx,
systemTraceCtx: gatewayExecutorRequest.systemTraceCtx,
})
).finish(),
})
Expand Down
42 changes: 24 additions & 18 deletions packages/inngest/src/components/connect/protobuf/connect.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,27 @@ message WorkerConnectRequestData {
google.protobuf.Timestamp started_at = 12;
}

message GatewaySyncRequestData {
optional string deploy_id = 1;
}

message GatewayExecutorRequestData {
string request_id = 1;
string env_id = 2;
string app_id = 3;
string function_slug = 4;
optional string step_id = 5;
bytes request_payload = 6;
string account_id = 2;
string env_id = 3;
string app_id = 4;
string function_slug = 5;
optional string step_id = 6;
bytes request_payload = 7;
bytes system_trace_ctx = 8;
bytes user_trace_ctx = 9;
}

message WorkerRequestAckData {
string request_id = 1;
string app_id = 2;
string function_slug = 3;
optional string step_id = 4;
string account_id = 2;
string env_id = 3;
string app_id = 4;
string function_slug = 5;
optional string step_id = 6;
bytes system_trace_ctx = 7;
bytes user_trace_ctx = 8;
}

enum SDKResponseStatus {
Expand All @@ -91,14 +94,17 @@ enum SDKResponseStatus {

message SDKResponse {
string request_id = 1;
string env_id = 2;
string app_id = 3;
SDKResponseStatus status = 4;
bytes body = 5;
bool no_retry = 6;
optional string retry_after = 7;
string account_id = 2;
string env_id = 3;
string app_id = 4;
SDKResponseStatus status = 5;
bytes body = 6;
bool no_retry = 7;
optional string retry_after = 8;
string sdk_version = 9;
uint32 request_version = 10;
bytes system_trace_ctx = 11;
bytes user_trace_ctx = 12;
}

message WorkerReplyAckData {
Expand Down
36 changes: 36 additions & 0 deletions packages/inngest/src/components/connect/util.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { headerKeys } from "inngest/helpers/consts";

export class ReconnectError extends Error {
constructor(
message: string,
Expand Down Expand Up @@ -54,3 +56,37 @@ export function waitWithCancel(ms: number, cancelIf: () => boolean) {
}, 100);
});
}

function isObject(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

function isString(value: unknown): value is string {
return typeof value === "string";
}

export function parseTraceCtx(serializedTraceCtx: Uint8Array<ArrayBufferLike>) {
const parsedTraceCtx: unknown =
serializedTraceCtx.length > 0
? JSON.parse(new TextDecoder().decode(serializedTraceCtx))
: null;

if (!isObject(parsedTraceCtx)) {
return null;
}

const traceParent = parsedTraceCtx[headerKeys.TraceParent];
if (!isString(traceParent)) {
return null;
}

const traceState = parsedTraceCtx[headerKeys.TraceState];
if (!isString(traceState)) {
return null;
}

return {
traceParent,
traceState,
};
}
Loading

0 comments on commit a2aadb1

Please sign in to comment.