Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support streams within diagrams #43

Open
mxgrey opened this issue Jan 2, 2025 · 0 comments
Open

Support streams within diagrams #43

mxgrey opened this issue Jan 2, 2025 · 0 comments

Comments

@mxgrey
Copy link
Contributor

mxgrey commented Jan 2, 2025

After #27 is finished, we should follow up with a feature to support connecting node streams within a diagram.

There are two aspects to consider:

1. Sending a stream output out of the workflow

We can have diagram-based workflows always provide a DiagramStream as their stream type. It might look something like this:

#[derive(Debug, Clone, Stream)]
pub struct DiagramStream {
    /// The type name as given by std::any::type_name
    pub type_name: &'static str,
    /// The actual value streamed in
    pub value: serde_json::Value,
}

then we can implement Splittable for this struct so that users of the workflow can connect specific stream output types into the nodes that are able to receive them.

Then we can introduce a built-in stream_out target for the ops schema that converts any incoming type into a DiagramStream instance and streams it out of the workflow. An example might look like this:

{
    "start": "fork",
    "ops": {
        "fork": {
            "type": "forkClone",
            "next": ["stream_out", "op1"]
        },
        "op1": {
            "type": "node",
            "builder": "multiply3",
            "next": "terminate"
        }
    }
}

This means users would not be able to use "stream_out" as an operation ID, similar to "start" and "terminate".

2. Connecting a stream output of one node into the input slot of another node within the same diagram

We can introduce a "stream_to" field for the "node" operation that provides a dictionary of stream output type names along with the target name for where it should be sent. The schema might look something like this:

{
    "start": "op1",
    "ops": {
        "op1": {
            "type": "node",
            "builder": "multiply3",
            "next": "terminate",
            "stream_to": {
                "i32": "stream_out",
                "my_library::MultiplicationError": "handle_errors"
            }
        },
        "handle_errors": {
            "type": "node",
            "builder": "error_hanlder",
            "dispose": true
        }
    }
}

We may need to add a method to the Stream trait to help us convert the Streams generic argument into a dictionary that we can use to match DynOutputs to DynInputSlots.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant