Workflows API Reference
HLQuantum workflows let you orchestrate multi-step quantum–classical pipelines with branching, looping, parallelism, and automatic checkpointing.
Key Concepts
| Symbol |
Purpose |
Workflow |
Ordered collection of nodes; runs async, supports save/resume. |
WorkflowRunner |
Executes circuits and classical functions with optional throttling. |
TaskNode |
Runs a circuit, layer, or callable. |
ClassicalNode |
Runs a classical function, receiving the workflow context dict. |
MapNode |
Applies a transform to a single context key. |
PipelineNode |
Chains multiple classical functions sequentially. |
ParallelNode |
Executes multiple nodes concurrently via asyncio.gather. |
LoopNode |
Repeats a node N times, threading context between iterations. |
ConditionalNode |
Branches on a condition callable (if/else). |
Convenience factory functions — Parallel(), Loop(), Branch(), Classical(), Map(), Pipeline() — wrap the node classes with automatic type detection.
Context Propagation
Each node result is stored in the workflow context under two keys:
"previous_result" — always the most recent result.
"result_<node_id>" — keyed by node ID for targeted access.
Classical functions receive the full context dict as their first argument.
Quick Example — Hybrid Quantum → Classical
import asyncio
from hlquantum.circuit import Circuit
from hlquantum.workflows import Workflow, Branch, WorkflowRunner
wf = Workflow(name="HybridPipeline")
# Step 1 — quantum circuit
wf.add(Circuit(2).h(0).cx(0, 1).measure_all(), name="bell")
# Step 2 — classical: extract counts
wf.add(lambda ctx: ctx["previous_result"].counts, name="extract")
# Step 3 — classical: compute correlation
def correlation(ctx):
counts = ctx["previous_result"]
total = sum(counts.values())
return (counts.get("00", 0) + counts.get("11", 0)) / total
wf.add(correlation, name="correlate")
# Step 4 — branch
wf.add(Branch(
lambda ctx: ctx["previous_result"] > 0.9,
lambda ctx: "entangled",
lambda ctx: "not entangled",
), name="classify")
results = asyncio.run(wf.run())
Parallelism & Loops
from hlquantum.workflows import Parallel, Loop
wf = Workflow()
wf.add(Parallel(circuit_a, circuit_b)) # run concurrently
wf.add(Loop(circuit_c, iterations=5)) # repeat 5 times
Save & Resume
wf = Workflow(state_file="checkpoint.json")
wf.add(circuit, name="expensive_step")
# First run — saves progress after each node
results = asyncio.run(wf.run())
# Later — skips already-completed nodes
results = asyncio.run(wf.run(resume=True))
Mermaid Visualisation
print(wf.to_mermaid())
# graph TD
# N0[bell (Task)]
# N1[extract (Clas)]
# N0 --> N1
# ...
Throttling
runner = WorkflowRunner(backend=my_backend, throttling_delay=0.5)
results = asyncio.run(wf.run(runner=runner))
Engine
Workflow
A collection of nodes forming a quantum state machine or execution flow.
Source code in hlquantum/workflows/engine.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 | class Workflow:
"""A collection of nodes forming a quantum state machine or execution flow."""
def __init__(self, state_file: Optional[str] = None, name: str = "HybridWorkflow") -> None:
self.nodes: List[WorkflowNode] = []
self.state_file = state_file
self.name = name
self.completed_nodes: List[str] = []
self.context: Dict[str, Any] = {}
if self.state_file and os.path.exists(self.state_file):
self._load_state()
def add(self, action: Any, node_id: Optional[str] = None, name: Optional[str] = None) -> Workflow:
"""Add a node to the workflow.
*action* can be a ``WorkflowNode``, a ``Circuit``, a ``Layer``,
or any callable (classical function). Callables that are not
Circuits or Layers are automatically wrapped in a ``ClassicalNode``
if they accept a dict argument, otherwise in a ``TaskNode``.
"""
if not isinstance(action, WorkflowNode):
if isinstance(action, (Circuit,)):
action = TaskNode(action, node_id=node_id, name=name)
elif callable(action):
# Wrap plain callables as ClassicalNode for first-class support
action = ClassicalNode(action, node_id=node_id, name=name)
else:
action = TaskNode(action, node_id=node_id, name=name)
elif node_id or name:
if node_id: action.node_id = node_id
if name: action.name = name
self.nodes.append(action)
return self
def _save_state(self) -> None:
if self.state_file:
with open(self.state_file, 'w') as f:
json.dump({"completed_nodes": self.completed_nodes}, f)
def _load_state(self) -> None:
try:
with open(self.state_file, 'r') as f:
data = json.load(f)
self.completed_nodes = data.get("completed_nodes", [])
except (json.JSONDecodeError, IOError):
pass
async def run(self, runner: Optional[WorkflowRunner] = None, resume: bool = False, verbose: bool = True) -> List[Any]:
"""Execute all nodes in sequence, propagating context between them.
Each node result is stored in ``self.context`` under two keys:
* ``"previous_result"`` — always the most recent result
* ``"result_<node_id>"`` — keyed by node id for targeted access
"""
if runner is None:
runner = WorkflowRunner()
results = []
total = len(self.nodes)
if verbose:
print(f"Starting Workflow: {self.name} [{total} nodes]")
for i, node in enumerate(self.nodes):
if resume and node.node_id in self.completed_nodes:
if verbose:
print(f"[{i+1}/{total}] Skipping: {node.name} (id: {node.node_id[:8]}...)")
results.append(None)
continue
if verbose:
node_type = type(node).__name__
print(f"[{i+1}/{total}] Running: {node.name} ({node_type})...")
result = await node.execute(runner, context=self.context)
results.append(result)
# Update context so downstream nodes can access prior results
self.context["previous_result"] = result
self.context[f"result_{node.node_id}"] = result
self.context["results"] = results
self.completed_nodes.append(node.node_id)
self._save_state()
if verbose:
print(f"Workflow {self.name} completed successfully.")
return results
def to_mermaid(self) -> str:
"""Generate a Mermaid diagram definition for the workflow."""
from hlquantum.workflows.nodes import ParallelNode
lines = ["graph TD"]
for i, node in enumerate(self.nodes):
node_id = f"N{i}"
label = f"{node.name} ({node.node_id[:4]})"
if isinstance(node, ParallelNode):
lines.append(f" subgraph SUB{i} [{label}]")
for j, subnode in enumerate(node.nodes):
lines.append(f" {node_id}_{j}[{subnode.name}]")
lines.append(" end")
else:
lines.append(f" {node_id}[{label}]")
if i > 0:
prev_id = f"N{i-1}"
lines.append(f" {prev_id} --> {node_id}")
return "\n".join(lines)
|
add(action, node_id=None, name=None)
Add a node to the workflow.
action can be a WorkflowNode, a Circuit, a Layer,
or any callable (classical function). Callables that are not
Circuits or Layers are automatically wrapped in a ClassicalNode
if they accept a dict argument, otherwise in a TaskNode.
Source code in hlquantum/workflows/engine.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | def add(self, action: Any, node_id: Optional[str] = None, name: Optional[str] = None) -> Workflow:
"""Add a node to the workflow.
*action* can be a ``WorkflowNode``, a ``Circuit``, a ``Layer``,
or any callable (classical function). Callables that are not
Circuits or Layers are automatically wrapped in a ``ClassicalNode``
if they accept a dict argument, otherwise in a ``TaskNode``.
"""
if not isinstance(action, WorkflowNode):
if isinstance(action, (Circuit,)):
action = TaskNode(action, node_id=node_id, name=name)
elif callable(action):
# Wrap plain callables as ClassicalNode for first-class support
action = ClassicalNode(action, node_id=node_id, name=name)
else:
action = TaskNode(action, node_id=node_id, name=name)
elif node_id or name:
if node_id: action.node_id = node_id
if name: action.name = name
self.nodes.append(action)
return self
|
run(runner=None, resume=False, verbose=True)
async
Execute all nodes in sequence, propagating context between them.
Each node result is stored in self.context under two keys:
* "previous_result" — always the most recent result
* "result_<node_id>" — keyed by node id for targeted access
Source code in hlquantum/workflows/engine.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 | async def run(self, runner: Optional[WorkflowRunner] = None, resume: bool = False, verbose: bool = True) -> List[Any]:
"""Execute all nodes in sequence, propagating context between them.
Each node result is stored in ``self.context`` under two keys:
* ``"previous_result"`` — always the most recent result
* ``"result_<node_id>"`` — keyed by node id for targeted access
"""
if runner is None:
runner = WorkflowRunner()
results = []
total = len(self.nodes)
if verbose:
print(f"Starting Workflow: {self.name} [{total} nodes]")
for i, node in enumerate(self.nodes):
if resume and node.node_id in self.completed_nodes:
if verbose:
print(f"[{i+1}/{total}] Skipping: {node.name} (id: {node.node_id[:8]}...)")
results.append(None)
continue
if verbose:
node_type = type(node).__name__
print(f"[{i+1}/{total}] Running: {node.name} ({node_type})...")
result = await node.execute(runner, context=self.context)
results.append(result)
# Update context so downstream nodes can access prior results
self.context["previous_result"] = result
self.context[f"result_{node.node_id}"] = result
self.context["results"] = results
self.completed_nodes.append(node.node_id)
self._save_state()
if verbose:
print(f"Workflow {self.name} completed successfully.")
return results
|
to_mermaid()
Generate a Mermaid diagram definition for the workflow.
Source code in hlquantum/workflows/engine.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 | def to_mermaid(self) -> str:
"""Generate a Mermaid diagram definition for the workflow."""
from hlquantum.workflows.nodes import ParallelNode
lines = ["graph TD"]
for i, node in enumerate(self.nodes):
node_id = f"N{i}"
label = f"{node.name} ({node.node_id[:4]})"
if isinstance(node, ParallelNode):
lines.append(f" subgraph SUB{i} [{label}]")
for j, subnode in enumerate(node.nodes):
lines.append(f" {node_id}_{j}[{subnode.name}]")
lines.append(" end")
else:
lines.append(f" {node_id}[{label}]")
if i > 0:
prev_id = f"N{i-1}"
lines.append(f" {prev_id} --> {node_id}")
return "\n".join(lines)
|
WorkflowRunner
Handles execution of workflow nodes with optional throttling and concurrency.
Supports both quantum circuit execution and classical function execution.
Source code in hlquantum/workflows/engine.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50 | class WorkflowRunner:
"""Handles execution of workflow nodes with optional throttling and concurrency.
Supports both quantum circuit execution and classical function execution.
"""
def __init__(self, backend=None, throttling_delay: float = 0.0) -> None:
self.backend = backend
self.throttling_delay = throttling_delay
async def run_circuit(self, circuit: Circuit) -> Any:
"""Executes a single circuit asynchronously."""
if self.throttling_delay > 0:
await asyncio.sleep(self.throttling_delay)
# hl_run is likely synchronous, so we run it in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: hl_run(circuit, backend=self.backend))
async def run_classical(self, func, *args, **kwargs) -> Any:
"""Executes a classical function asynchronously.
If the function is a coroutine, it is awaited directly.
Otherwise it is run in a thread-pool executor so the event loop
is not blocked by long-running computations.
"""
import inspect
if self.throttling_delay > 0:
await asyncio.sleep(self.throttling_delay)
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
async def run_parallel(self, nodes: List[WorkflowNode], context: Optional[Dict[str, Any]] = None) -> List[Any]:
"""Executes nodes in parallel using asyncio.gather."""
ctx = context or {}
tasks = [node.execute(self, context=ctx) for node in nodes]
return await asyncio.gather(*tasks)
|
run_circuit(circuit)
async
Executes a single circuit asynchronously.
Source code in hlquantum/workflows/engine.py
| async def run_circuit(self, circuit: Circuit) -> Any:
"""Executes a single circuit asynchronously."""
if self.throttling_delay > 0:
await asyncio.sleep(self.throttling_delay)
# hl_run is likely synchronous, so we run it in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: hl_run(circuit, backend=self.backend))
|
run_classical(func, *args, **kwargs)
async
Executes a classical function asynchronously.
If the function is a coroutine, it is awaited directly.
Otherwise it is run in a thread-pool executor so the event loop
is not blocked by long-running computations.
Source code in hlquantum/workflows/engine.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44 | async def run_classical(self, func, *args, **kwargs) -> Any:
"""Executes a classical function asynchronously.
If the function is a coroutine, it is awaited directly.
Otherwise it is run in a thread-pool executor so the event loop
is not blocked by long-running computations.
"""
import inspect
if self.throttling_delay > 0:
await asyncio.sleep(self.throttling_delay)
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
|
run_parallel(nodes, context=None)
async
Executes nodes in parallel using asyncio.gather.
Source code in hlquantum/workflows/engine.py
| async def run_parallel(self, nodes: List[WorkflowNode], context: Optional[Dict[str, Any]] = None) -> List[Any]:
"""Executes nodes in parallel using asyncio.gather."""
ctx = context or {}
tasks = [node.execute(self, context=ctx) for node in nodes]
return await asyncio.gather(*tasks)
|
Nodes
hlquantum.workflows.nodes
~~~~~~~~~~~~~~~~~~~~~~~~~
Nodes for defining hybrid quantum-classical workflows and pipelines.
Supports quantum circuits, classical functions, data transformations,
and mixed pipelines that chain quantum and classical steps together.
ClassicalNode
Bases: WorkflowNode
Executes a classical (non-quantum) function within a workflow.
The function receives the workflow context dict containing results from
prior nodes. It may be synchronous or async.
Parameters
func : Callable
A callable that accepts a context dict and returns any value.
node_id : str, optional
Unique identifier for this node.
name : str, optional
Human-readable name.
Source code in hlquantum/workflows/nodes.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144 | class ClassicalNode(WorkflowNode):
"""Executes a classical (non-quantum) function within a workflow.
The function receives the workflow context dict containing results from
prior nodes. It may be synchronous or async.
Parameters
----------
func : Callable
A callable that accepts a context dict and returns any value.
node_id : str, optional
Unique identifier for this node.
name : str, optional
Human-readable name.
"""
def __init__(self, func: Callable[..., Any], node_id: Optional[str] = None, name: Optional[str] = None) -> None:
super().__init__(node_id, name)
if not callable(func):
raise TypeError(f"ClassicalNode requires a callable, got {type(func)}")
self.func = func
async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
import inspect
ctx = context or {}
sig = inspect.signature(self.func)
# Pass context if the function accepts an argument
if sig.parameters:
if inspect.iscoroutinefunction(self.func):
return await self.func(ctx)
return self.func(ctx)
else:
if inspect.iscoroutinefunction(self.func):
return await self.func()
return self.func()
|
ConditionalNode
Bases: WorkflowNode
Executes one of two nodes based on a condition.
Source code in hlquantum/workflows/nodes.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 | class ConditionalNode(WorkflowNode):
"""Executes one of two nodes based on a condition."""
def __init__(self, condition: Callable[..., bool], true_node: WorkflowNode, false_node: Optional[WorkflowNode] = None, node_id: Optional[str] = None, name: Optional[str] = None) -> None:
super().__init__(node_id, name)
self.condition = condition
self.true_node = true_node
self.false_node = false_node
async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
import inspect
ctx = context or {}
sig = inspect.signature(self.condition)
if sig.parameters:
cond_val = await self.condition(ctx) if inspect.iscoroutinefunction(self.condition) else self.condition(ctx)
else:
cond_val = await self.condition() if inspect.iscoroutinefunction(self.condition) else self.condition()
if cond_val:
return await self.true_node.execute(runner, context=ctx)
elif self.false_node:
return await self.false_node.execute(runner, context=ctx)
return None
|
LoopNode
Bases: WorkflowNode
Executes a node in a loop.
Source code in hlquantum/workflows/nodes.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 | class LoopNode(WorkflowNode):
"""Executes a node in a loop."""
def __init__(self, body: WorkflowNode, iterations: int, node_id: Optional[str] = None, name: Optional[str] = None) -> None:
super().__init__(node_id, name)
self.body = body
self.iterations = iterations
async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> List[Any]:
results = []
ctx = context or {}
for i in range(self.iterations):
result = await self.body.execute(runner, context=ctx)
results.append(result)
ctx = {**ctx, "previous_result": result, "loop_index": i}
return results
|
MapNode
Bases: WorkflowNode
Applies a classical transformation to a specific key from the context.
Useful for post-processing quantum results or transforming data between
pipeline stages.
Parameters
func : Callable
A function that takes a single value and returns a transformed value.
input_key : str
The context key whose value will be passed to func.
node_id : str, optional
Unique identifier.
name : str, optional
Human-readable name.
Source code in hlquantum/workflows/nodes.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178 | class MapNode(WorkflowNode):
"""Applies a classical transformation to a specific key from the context.
Useful for post-processing quantum results or transforming data between
pipeline stages.
Parameters
----------
func : Callable
A function that takes a single value and returns a transformed value.
input_key : str
The context key whose value will be passed to *func*.
node_id : str, optional
Unique identifier.
name : str, optional
Human-readable name.
"""
def __init__(self, func: Callable[[Any], Any], input_key: str = "previous_result", node_id: Optional[str] = None, name: Optional[str] = None) -> None:
super().__init__(node_id, name)
if not callable(func):
raise TypeError(f"MapNode requires a callable, got {type(func)}")
self.func = func
self.input_key = input_key
async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
import inspect
ctx = context or {}
value = ctx.get(self.input_key)
if inspect.iscoroutinefunction(self.func):
return await self.func(value)
return self.func(value)
|
ParallelNode
Bases: WorkflowNode
Executes multiple nodes in parallel.
Source code in hlquantum/workflows/nodes.py
56
57
58
59
60
61
62
63
64
65 | class ParallelNode(WorkflowNode):
"""Executes multiple nodes in parallel."""
def __init__(self, nodes: List[WorkflowNode], node_id: Optional[str] = None, name: Optional[str] = None) -> None:
super().__init__(node_id, name)
self.nodes = nodes
async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> List[Any]:
# The runner will handle actual parallelism if supported
return await runner.run_parallel(self.nodes, context=context)
|
PipelineNode
Bases: WorkflowNode
Chains multiple classical functions into a single node.
Each function receives the output of the previous one, forming a
classical processing pipeline. The first function in the chain
receives the value from input_key in the workflow context.
Parameters
funcs : list of Callable
Ordered list of functions to chain.
input_key : str
Context key to seed the first function.
node_id : str, optional
Unique identifier.
name : str, optional
Human-readable name.
Source code in hlquantum/workflows/nodes.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217 | class PipelineNode(WorkflowNode):
"""Chains multiple classical functions into a single node.
Each function receives the output of the previous one, forming a
classical processing pipeline. The first function in the chain
receives the value from *input_key* in the workflow context.
Parameters
----------
funcs : list of Callable
Ordered list of functions to chain.
input_key : str
Context key to seed the first function.
node_id : str, optional
Unique identifier.
name : str, optional
Human-readable name.
"""
def __init__(self, funcs: List[Callable], input_key: str = "previous_result", node_id: Optional[str] = None, name: Optional[str] = None) -> None:
super().__init__(node_id, name)
for f in funcs:
if not callable(f):
raise TypeError(f"PipelineNode requires callables, got {type(f)}")
self.funcs = funcs
self.input_key = input_key
async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
import inspect
ctx = context or {}
value = ctx.get(self.input_key)
for func in self.funcs:
if inspect.iscoroutinefunction(func):
value = await func(value)
else:
value = func(value)
return value
|
TaskNode
Bases: WorkflowNode
Executes a single circuit, layer, or callable.
Source code in hlquantum/workflows/nodes.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 | class TaskNode(WorkflowNode):
"""Executes a single circuit, layer, or callable."""
def __init__(self, action: Union[Circuit, Layer, Callable], node_id: Optional[str] = None, name: Optional[str] = None) -> None:
super().__init__(node_id, name)
self.action = action
async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
if isinstance(self.action, Circuit):
return await runner.run_circuit(self.action)
elif isinstance(self.action, Layer):
return await runner.run_circuit(self.action.build())
elif callable(self.action):
import inspect
sig = inspect.signature(self.action)
if sig.parameters and context is not None:
if inspect.iscoroutinefunction(self.action):
return await self.action(context)
return self.action(context)
else:
if inspect.iscoroutinefunction(self.action):
return await self.action()
return self.action()
else:
raise TypeError(f"Unsupported action type: {type(self.action)}")
|
WorkflowNode
Bases: ABC
Base class for a node in a workflow.
Source code in hlquantum/workflows/nodes.py
17
18
19
20
21
22
23
24
25
26 | class WorkflowNode(ABC):
"""Base class for a node in a workflow."""
def __init__(self, node_id: Optional[str] = None, name: Optional[str] = None) -> None:
self.node_id = node_id or f"{self.__class__.__name__}_{id(self)}"
self.name = name or self.__class__.__name__
@abstractmethod
async def execute(self, runner: "WorkflowRunner", context: Optional[Dict[str, Any]] = None) -> Any:
pass
|