Skip to content

Workflows API Reference

HLQuantum workflows let you orchestrate multi-step quantum–classical pipelines with branching, looping, parallelism, and automatic checkpointing.

Key Concepts

Symbol Purpose
Workflow Ordered collection of nodes; runs async, supports save/resume.
WorkflowRunner Executes circuits and classical functions with optional throttling.
TaskNode Runs a circuit, layer, or callable.
ClassicalNode Runs a classical function, receiving the workflow context dict.
MapNode Applies a transform to a single context key.
PipelineNode Chains multiple classical functions sequentially.
ParallelNode Executes multiple nodes concurrently via asyncio.gather.
LoopNode Repeats a node N times, threading context between iterations.
ConditionalNode Branches on a condition callable (if/else).

Convenience factory functions — Parallel(), Loop(), Branch(), Classical(), Map(), Pipeline() — wrap the node classes with automatic type detection.

Context Propagation

Each node result is stored in the workflow context under two keys:

  • "previous_result" — always the most recent result.
  • "result_<node_id>" — keyed by node ID for targeted access.

Classical functions receive the full context dict as their first argument.

Quick Example — Hybrid Quantum → Classical

import asyncio
from hlquantum.circuit import Circuit
from hlquantum.workflows import Workflow, Branch, WorkflowRunner

wf = Workflow(name="HybridPipeline")

# Step 1 — quantum circuit
wf.add(Circuit(2).h(0).cx(0, 1).measure_all(), name="bell")

# Step 2 — classical: extract counts
wf.add(lambda ctx: ctx["previous_result"].counts, name="extract")

# Step 3 — classical: compute correlation
def correlation(ctx):
    counts = ctx["previous_result"]
    total = sum(counts.values())
    return (counts.get("00", 0) + counts.get("11", 0)) / total

wf.add(correlation, name="correlate")

# Step 4 — branch
wf.add(Branch(
    lambda ctx: ctx["previous_result"] > 0.9,
    lambda ctx: "entangled",
    lambda ctx: "not entangled",
), name="classify")

results = asyncio.run(wf.run())

Parallelism & Loops

from hlquantum.workflows import Parallel, Loop

wf = Workflow()
wf.add(Parallel(circuit_a, circuit_b))  # run concurrently
wf.add(Loop(circuit_c, iterations=5))   # repeat 5 times

Save & Resume

wf = Workflow(state_file="checkpoint.json")
wf.add(circuit, name="expensive_step")

# First run — saves progress after each node
results = asyncio.run(wf.run())

# Later — skips already-completed nodes
results = asyncio.run(wf.run(resume=True))

Mermaid Visualisation

print(wf.to_mermaid())
# graph TD
#     N0[bell (Task)]
#     N1[extract (Clas)]
#     N0 --> N1
#     ...

Throttling

runner = WorkflowRunner(backend=my_backend, throttling_delay=0.5)
results = asyncio.run(wf.run(runner=runner))

Engine

Workflow

A collection of nodes forming a quantum state machine or execution flow.

Source code in hlquantum/workflows/engine.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class Workflow:
    """A collection of nodes forming a quantum state machine or execution flow."""

    def __init__(self, state_file: Optional[str] = None, name: str = "HybridWorkflow") -> None:
        self.nodes: List[WorkflowNode] = []
        self.state_file = state_file
        self.name = name
        self.completed_nodes: List[str] = []
        self.context: Dict[str, Any] = {}

        if self.state_file and os.path.exists(self.state_file):
            self._load_state()

    def add(self, action: Any, node_id: Optional[str] = None, name: Optional[str] = None) -> Workflow:
        """Add a node to the workflow.

        *action* can be a ``WorkflowNode``, a ``Circuit``, a ``Layer``,
        or any callable (classical function).  Callables that are not
        Circuits or Layers are automatically wrapped in a ``ClassicalNode``
        if they accept a dict argument, otherwise in a ``TaskNode``.
        """
        if not isinstance(action, WorkflowNode):
            if isinstance(action, (Circuit,)):
                action = TaskNode(action, node_id=node_id, name=name)
            elif callable(action):
                # Wrap plain callables as ClassicalNode for first-class support
                action = ClassicalNode(action, node_id=node_id, name=name)
            else:
                action = TaskNode(action, node_id=node_id, name=name)
        elif node_id or name:
            if node_id: action.node_id = node_id
            if name: action.name = name
        self.nodes.append(action)
        return self

    def _save_state(self) -> None:
        if self.state_file:
            with open(self.state_file, 'w') as f:
                json.dump({"completed_nodes": self.completed_nodes}, f)

    def _load_state(self) -> None:
        try:
            with open(self.state_file, 'r') as f:
                data = json.load(f)
                self.completed_nodes = data.get("completed_nodes", [])
        except (json.JSONDecodeError, IOError):
            pass

    async def run(self, runner: Optional[WorkflowRunner] = None, resume: bool = False, verbose: bool = True) -> List[Any]:
        """Execute all nodes in sequence, propagating context between them.

        Each node result is stored in ``self.context`` under two keys:
        * ``"previous_result"`` — always the most recent result
        * ``"result_<node_id>"`` — keyed by node id for targeted access
        """
        if runner is None:
            runner = WorkflowRunner()

        results = []
        total = len(self.nodes)

        if verbose:
            print(f"Starting Workflow: {self.name} [{total} nodes]")

        for i, node in enumerate(self.nodes):
            if resume and node.node_id in self.completed_nodes:
                if verbose:
                    print(f"[{i+1}/{total}] Skipping: {node.name} (id: {node.node_id[:8]}...)")
                results.append(None)
                continue

            if verbose:
                node_type = type(node).__name__
                print(f"[{i+1}/{total}] Running: {node.name} ({node_type})...")

            result = await node.execute(runner, context=self.context)
            results.append(result)

            # Update context so downstream nodes can access prior results
            self.context["previous_result"] = result
            self.context[f"result_{node.node_id}"] = result
            self.context["results"] = results

            self.completed_nodes.append(node.node_id)
            self._save_state()

        if verbose:
            print(f"Workflow {self.name} completed successfully.")

        return results

    def to_mermaid(self) -> str:
        """Generate a Mermaid diagram definition for the workflow."""
        from hlquantum.workflows.nodes import ParallelNode
        lines = ["graph TD"]
        for i, node in enumerate(self.nodes):
            node_id = f"N{i}"
            label = f"{node.name} ({node.node_id[:4]})"

            if isinstance(node, ParallelNode):
                lines.append(f"    subgraph SUB{i} [{label}]")
                for j, subnode in enumerate(node.nodes):
                    lines.append(f"        {node_id}_{j}[{subnode.name}]")
                lines.append("    end")
            else:
                lines.append(f"    {node_id}[{label}]")

            if i > 0:
                prev_id = f"N{i-1}"
                lines.append(f"    {prev_id} --> {node_id}")

        return "\n".join(lines)

add(action, node_id=None, name=None)

Add a node to the workflow.

action can be a WorkflowNode, a Circuit, a Layer, or any callable (classical function). Callables that are not Circuits or Layers are automatically wrapped in a ClassicalNode if they accept a dict argument, otherwise in a TaskNode.

Source code in hlquantum/workflows/engine.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def add(self, action: Any, node_id: Optional[str] = None, name: Optional[str] = None) -> Workflow:
    """Add a node to the workflow.

    *action* can be a ``WorkflowNode``, a ``Circuit``, a ``Layer``,
    or any callable (classical function).  Callables that are not
    Circuits or Layers are automatically wrapped in a ``ClassicalNode``
    if they accept a dict argument, otherwise in a ``TaskNode``.
    """
    if not isinstance(action, WorkflowNode):
        if isinstance(action, (Circuit,)):
            action = TaskNode(action, node_id=node_id, name=name)
        elif callable(action):
            # Wrap plain callables as ClassicalNode for first-class support
            action = ClassicalNode(action, node_id=node_id, name=name)
        else:
            action = TaskNode(action, node_id=node_id, name=name)
    elif node_id or name:
        if node_id: action.node_id = node_id
        if name: action.name = name
    self.nodes.append(action)
    return self

run(runner=None, resume=False, verbose=True) async

Execute all nodes in sequence, propagating context between them.

Each node result is stored in self.context under two keys: * "previous_result" — always the most recent result * "result_<node_id>" — keyed by node id for targeted access

Source code in hlquantum/workflows/engine.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def run(self, runner: Optional[WorkflowRunner] = None, resume: bool = False, verbose: bool = True) -> List[Any]:
    """Execute all nodes in sequence, propagating context between them.

    Each node result is stored in ``self.context`` under two keys:
    * ``"previous_result"`` — always the most recent result
    * ``"result_<node_id>"`` — keyed by node id for targeted access
    """
    if runner is None:
        runner = WorkflowRunner()

    results = []
    total = len(self.nodes)

    if verbose:
        print(f"Starting Workflow: {self.name} [{total} nodes]")

    for i, node in enumerate(self.nodes):
        if resume and node.node_id in self.completed_nodes:
            if verbose:
                print(f"[{i+1}/{total}] Skipping: {node.name} (id: {node.node_id[:8]}...)")
            results.append(None)
            continue

        if verbose:
            node_type = type(node).__name__
            print(f"[{i+1}/{total}] Running: {node.name} ({node_type})...")

        result = await node.execute(runner, context=self.context)
        results.append(result)

        # Update context so downstream nodes can access prior results
        self.context["previous_result"] = result
        self.context[f"result_{node.node_id}"] = result
        self.context["results"] = results

        self.completed_nodes.append(node.node_id)
        self._save_state()

    if verbose:
        print(f"Workflow {self.name} completed successfully.")

    return results

to_mermaid()

Generate a Mermaid diagram definition for the workflow.

Source code in hlquantum/workflows/engine.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def to_mermaid(self) -> str:
    """Generate a Mermaid diagram definition for the workflow."""
    from hlquantum.workflows.nodes import ParallelNode
    lines = ["graph TD"]
    for i, node in enumerate(self.nodes):
        node_id = f"N{i}"
        label = f"{node.name} ({node.node_id[:4]})"

        if isinstance(node, ParallelNode):
            lines.append(f"    subgraph SUB{i} [{label}]")
            for j, subnode in enumerate(node.nodes):
                lines.append(f"        {node_id}_{j}[{subnode.name}]")
            lines.append("    end")
        else:
            lines.append(f"    {node_id}[{label}]")

        if i > 0:
            prev_id = f"N{i-1}"
            lines.append(f"    {prev_id} --> {node_id}")

    return "\n".join(lines)

WorkflowRunner

Handles execution of workflow nodes with optional throttling and concurrency.

Supports both quantum circuit execution and classical function execution.

Source code in hlquantum/workflows/engine.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class WorkflowRunner:
    """Handles execution of workflow nodes with optional throttling and concurrency.

    Supports both quantum circuit execution and classical function execution.
    """

    def __init__(self, backend=None, throttling_delay: float = 0.0) -> None:
        self.backend = backend
        self.throttling_delay = throttling_delay

    async def run_circuit(self, circuit: Circuit) -> Any:
        """Executes a single circuit asynchronously."""
        if self.throttling_delay > 0:
            await asyncio.sleep(self.throttling_delay)

        # hl_run is likely synchronous, so we run it in a thread pool to avoid blocking
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, lambda: hl_run(circuit, backend=self.backend))

    async def run_classical(self, func, *args, **kwargs) -> Any:
        """Executes a classical function asynchronously.

        If the function is a coroutine, it is awaited directly.
        Otherwise it is run in a thread-pool executor so the event loop
        is not blocked by long-running computations.
        """
        import inspect
        if self.throttling_delay > 0:
            await asyncio.sleep(self.throttling_delay)
        if inspect.iscoroutinefunction(func):
            return await func(*args, **kwargs)
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, lambda: func(*args, **kwargs))

    async def run_parallel(self, nodes: List[WorkflowNode], context: Optional[Dict[str, Any]] = None) -> List[Any]:
        """Executes nodes in parallel using asyncio.gather."""
        ctx = context or {}
        tasks = [node.execute(self, context=ctx) for node in nodes]
        return await asyncio.gather(*tasks)

run_circuit(circuit) async

Executes a single circuit asynchronously.

Source code in hlquantum/workflows/engine.py
22
23
24
25
26
27
28
29
async def run_circuit(self, circuit: Circuit) -> Any:
    """Executes a single circuit asynchronously."""
    if self.throttling_delay > 0:
        await asyncio.sleep(self.throttling_delay)

    # hl_run is likely synchronous, so we run it in a thread pool to avoid blocking
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, lambda: hl_run(circuit, backend=self.backend))

run_classical(func, *args, **kwargs) async

Executes a classical function asynchronously.

If the function is a coroutine, it is awaited directly. Otherwise it is run in a thread-pool executor so the event loop is not blocked by long-running computations.

Source code in hlquantum/workflows/engine.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
async def run_classical(self, func, *args, **kwargs) -> Any:
    """Executes a classical function asynchronously.

    If the function is a coroutine, it is awaited directly.
    Otherwise it is run in a thread-pool executor so the event loop
    is not blocked by long-running computations.
    """
    import inspect
    if self.throttling_delay > 0:
        await asyncio.sleep(self.throttling_delay)
    if inspect.iscoroutinefunction(func):
        return await func(*args, **kwargs)
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, lambda: func(*args, **kwargs))

run_parallel(nodes, context=None) async

Executes nodes in parallel using asyncio.gather.

Source code in hlquantum/workflows/engine.py
46
47
48
49
50
async def run_parallel(self, nodes: List[WorkflowNode], context: Optional[Dict[str, Any]] = None) -> List[Any]:
    """Executes nodes in parallel using asyncio.gather."""
    ctx = context or {}
    tasks = [node.execute(self, context=ctx) for node in nodes]
    return await asyncio.gather(*tasks)

Nodes

hlquantum.workflows.nodes ~~~~~~~~~~~~~~~~~~~~~~~~~

Nodes for defining hybrid quantum-classical workflows and pipelines. Supports quantum circuits, classical functions, data transformations, and mixed pipelines that chain quantum and classical steps together.

ClassicalNode

Bases: WorkflowNode

Executes a classical (non-quantum) function within a workflow.

The function receives the workflow context dict containing results from prior nodes. It may be synchronous or async.

Parameters

func : Callable A callable that accepts a context dict and returns any value. node_id : str, optional Unique identifier for this node. name : str, optional Human-readable name.

Source code in hlquantum/workflows/nodes.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class ClassicalNode(WorkflowNode):
    """Executes a classical (non-quantum) function within a workflow.

    The function receives the workflow context dict containing results from
    prior nodes.  It may be synchronous or async.

    Parameters
    ----------
    func : Callable
        A callable that accepts a context dict and returns any value.
    node_id : str, optional
        Unique identifier for this node.
    name : str, optional
        Human-readable name.
    """

    def __init__(self, func: Callable[..., Any], node_id: Optional[str] = None, name: Optional[str] = None) -> None:
        super().__init__(node_id, name)
        if not callable(func):
            raise TypeError(f"ClassicalNode requires a callable, got {type(func)}")
        self.func = func

    async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
        import inspect
        ctx = context or {}
        sig = inspect.signature(self.func)
        # Pass context if the function accepts an argument
        if sig.parameters:
            if inspect.iscoroutinefunction(self.func):
                return await self.func(ctx)
            return self.func(ctx)
        else:
            if inspect.iscoroutinefunction(self.func):
                return await self.func()
            return self.func()

ConditionalNode

Bases: WorkflowNode

Executes one of two nodes based on a condition.

Source code in hlquantum/workflows/nodes.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class ConditionalNode(WorkflowNode):
    """Executes one of two nodes based on a condition."""

    def __init__(self, condition: Callable[..., bool], true_node: WorkflowNode, false_node: Optional[WorkflowNode] = None, node_id: Optional[str] = None, name: Optional[str] = None) -> None:
        super().__init__(node_id, name)
        self.condition = condition
        self.true_node = true_node
        self.false_node = false_node

    async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
        import inspect
        ctx = context or {}
        sig = inspect.signature(self.condition)
        if sig.parameters:
            cond_val = await self.condition(ctx) if inspect.iscoroutinefunction(self.condition) else self.condition(ctx)
        else:
            cond_val = await self.condition() if inspect.iscoroutinefunction(self.condition) else self.condition()
        if cond_val:
            return await self.true_node.execute(runner, context=ctx)
        elif self.false_node:
            return await self.false_node.execute(runner, context=ctx)
        return None

LoopNode

Bases: WorkflowNode

Executes a node in a loop.

Source code in hlquantum/workflows/nodes.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
class LoopNode(WorkflowNode):
    """Executes a node in a loop."""

    def __init__(self, body: WorkflowNode, iterations: int, node_id: Optional[str] = None, name: Optional[str] = None) -> None:
        super().__init__(node_id, name)
        self.body = body
        self.iterations = iterations

    async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> List[Any]:
        results = []
        ctx = context or {}
        for i in range(self.iterations):
            result = await self.body.execute(runner, context=ctx)
            results.append(result)
            ctx = {**ctx, "previous_result": result, "loop_index": i}
        return results

MapNode

Bases: WorkflowNode

Applies a classical transformation to a specific key from the context.

Useful for post-processing quantum results or transforming data between pipeline stages.

Parameters

func : Callable A function that takes a single value and returns a transformed value. input_key : str The context key whose value will be passed to func. node_id : str, optional Unique identifier. name : str, optional Human-readable name.

Source code in hlquantum/workflows/nodes.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class MapNode(WorkflowNode):
    """Applies a classical transformation to a specific key from the context.

    Useful for post-processing quantum results or transforming data between
    pipeline stages.

    Parameters
    ----------
    func : Callable
        A function that takes a single value and returns a transformed value.
    input_key : str
        The context key whose value will be passed to *func*.
    node_id : str, optional
        Unique identifier.
    name : str, optional
        Human-readable name.
    """

    def __init__(self, func: Callable[[Any], Any], input_key: str = "previous_result", node_id: Optional[str] = None, name: Optional[str] = None) -> None:
        super().__init__(node_id, name)
        if not callable(func):
            raise TypeError(f"MapNode requires a callable, got {type(func)}")
        self.func = func
        self.input_key = input_key

    async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
        import inspect
        ctx = context or {}
        value = ctx.get(self.input_key)
        if inspect.iscoroutinefunction(self.func):
            return await self.func(value)
        return self.func(value)

ParallelNode

Bases: WorkflowNode

Executes multiple nodes in parallel.

Source code in hlquantum/workflows/nodes.py
56
57
58
59
60
61
62
63
64
65
class ParallelNode(WorkflowNode):
    """Executes multiple nodes in parallel."""

    def __init__(self, nodes: List[WorkflowNode], node_id: Optional[str] = None, name: Optional[str] = None) -> None:
        super().__init__(node_id, name)
        self.nodes = nodes

    async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> List[Any]:
        # The runner will handle actual parallelism if supported
        return await runner.run_parallel(self.nodes, context=context)

PipelineNode

Bases: WorkflowNode

Chains multiple classical functions into a single node.

Each function receives the output of the previous one, forming a classical processing pipeline. The first function in the chain receives the value from input_key in the workflow context.

Parameters

funcs : list of Callable Ordered list of functions to chain. input_key : str Context key to seed the first function. node_id : str, optional Unique identifier. name : str, optional Human-readable name.

Source code in hlquantum/workflows/nodes.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class PipelineNode(WorkflowNode):
    """Chains multiple classical functions into a single node.

    Each function receives the output of the previous one, forming a
    classical processing pipeline.  The first function in the chain
    receives the value from *input_key* in the workflow context.

    Parameters
    ----------
    funcs : list of Callable
        Ordered list of functions to chain.
    input_key : str
        Context key to seed the first function.
    node_id : str, optional
        Unique identifier.
    name : str, optional
        Human-readable name.
    """

    def __init__(self, funcs: List[Callable], input_key: str = "previous_result", node_id: Optional[str] = None, name: Optional[str] = None) -> None:
        super().__init__(node_id, name)
        for f in funcs:
            if not callable(f):
                raise TypeError(f"PipelineNode requires callables, got {type(f)}")
        self.funcs = funcs
        self.input_key = input_key

    async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
        import inspect
        ctx = context or {}
        value = ctx.get(self.input_key)
        for func in self.funcs:
            if inspect.iscoroutinefunction(func):
                value = await func(value)
            else:
                value = func(value)
        return value

TaskNode

Bases: WorkflowNode

Executes a single circuit, layer, or callable.

Source code in hlquantum/workflows/nodes.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class TaskNode(WorkflowNode):
    """Executes a single circuit, layer, or callable."""

    def __init__(self, action: Union[Circuit, Layer, Callable], node_id: Optional[str] = None, name: Optional[str] = None) -> None:
        super().__init__(node_id, name)
        self.action = action

    async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
        if isinstance(self.action, Circuit):
            return await runner.run_circuit(self.action)
        elif isinstance(self.action, Layer):
            return await runner.run_circuit(self.action.build())
        elif callable(self.action):
            import inspect
            sig = inspect.signature(self.action)
            if sig.parameters and context is not None:
                if inspect.iscoroutinefunction(self.action):
                    return await self.action(context)
                return self.action(context)
            else:
                if inspect.iscoroutinefunction(self.action):
                    return await self.action()
                return self.action()
        else:
            raise TypeError(f"Unsupported action type: {type(self.action)}")

WorkflowNode

Bases: ABC

Base class for a node in a workflow.

Source code in hlquantum/workflows/nodes.py
17
18
19
20
21
22
23
24
25
26
class WorkflowNode(ABC):
    """Base class for a node in a workflow."""

    def __init__(self, node_id: Optional[str] = None, name: Optional[str] = None) -> None:
        self.node_id = node_id or f"{self.__class__.__name__}_{id(self)}"
        self.name = name or self.__class__.__name__

    @abstractmethod
    async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
        pass