Building a Pipeline Compiler: From React Flow to Temporal Workflows
Cupel's visual pipeline builder lets teams drag components onto a canvas, connect them with edges, and deploy executable data pipelines. Behind the scenes, a pipeline compiler transforms the React Flow JSON representation into Temporal workflow Python code. This post walks through each stage of that compilation process, the design decisions behind it, and how column-level lineage is captured as a natural byproduct.
The Compilation Pipeline
The compiler executes five distinct stages, each producing an intermediate artifact consumed by the next. This staged approach keeps each transformation simple and testable in isolation.
React Flow JSON
|
v
[1. DAG Validation]
|
v
[2. Component Resolution]
|
v
[3. Policy Overlay]
|
v
[4. Intermediate Representation (IR)]
|
v
[5. Temporal Code Generation]
|
v
Python Workflow + Lineage Metadata
Stage 1: DAG Validation
The first stage validates the structural integrity of the visual graph. React Flow stores nodes and edges as JSON arrays. The compiler parses this JSON and checks three invariants before proceeding.
Cycle Detection
Data pipelines must be directed acyclic graphs. A cycle would mean a stage depends on its own output, creating an infinite loop. The compiler uses Kahn's algorithm (topological sort via in-degree counting) to detect cycles:
from collections import defaultdict, deque
def validate_dag(nodes: list[dict], edges: list[dict]) -> list[str]:
"""Validate that the graph is a DAG and return topological order.
Uses Kahn's algorithm: repeatedly remove nodes with zero in-degree.
If all nodes are removed, the graph is acyclic. If nodes remain,
the graph contains at least one cycle.
Args:
nodes: React Flow node definitions.
edges: React Flow edge definitions.
Returns:
List of node IDs in topological order.
Raises:
CompilationError: If the graph contains a cycle or disconnected inputs.
"""
in_degree = defaultdict(int)
adjacency = defaultdict(list)
node_ids = {n["id"] for n in nodes}
for edge in edges:
adjacency[edge["source"]].append(edge["target"])
in_degree[edge["target"]] += 1
# Seed queue with nodes that have no incoming edges (sources)
queue = deque(
nid for nid in node_ids if in_degree[nid] == 0
)
order = []
while queue:
current = queue.popleft()
order.append(current)
for neighbor in adjacency[current]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(order) != len(node_ids):
cycle_nodes = node_ids - set(order)
raise CompilationError(
f"Pipeline contains a cycle involving nodes: {cycle_nodes}"
)
return order
Input Completeness
Every non-source node must have at least one incoming edge. A transform node with no input connection is a configuration error that should be caught before code generation, not at runtime.
Type Compatibility
Edges carry a type annotation indicating the data format flowing between nodes (tabular, file reference, stream). The compiler validates that connected ports have compatible types. A Parquet file source cannot connect directly to a SQL transform input without an explicit format conversion node in between.
Stage 2: Component Resolution
Each node in the React Flow graph references a component by ID and version. The resolver fetches the full component definition, including its configuration schema, execution contract, and default settings:
@dataclass
class ResolvedComponent:
"""Fully resolved component with pinned version and config."""
component_id: str
version: str
component_type: str # source, transform, quality_gate, compliance, destination
execution_config: dict
input_schema: dict
output_schema: dict
failure_strategy: str
timeout_seconds: int
def resolve_components(
nodes: list[dict],
component_registry: ComponentRegistry,
) -> dict[str, ResolvedComponent]:
"""Resolve each node to its pinned component version.
Fetches the component definition from the registry, validates
the node's configuration against the component's schema, and
returns the resolved components keyed by node ID.
Args:
nodes: React Flow nodes with component_id and version in data.
component_registry: Service for fetching component definitions.
Returns:
Mapping of node ID to resolved component.
Raises:
CompilationError: If a component version is not found or config is invalid.
"""
resolved = {}
for node in nodes:
comp = component_registry.get(
node["data"]["component_id"],
node["data"]["version"],
)
if comp is None:
raise CompilationError(
f"Component {node['data']['component_id']}@{node['data']['version']} not found"
)
# Validate node config against component schema
validate_config(node["data"]["config"], comp.config_schema)
resolved[node["id"]] = ResolvedComponent(
component_id=comp.id,
version=comp.version,
component_type=comp.type,
execution_config=node["data"]["config"],
input_schema=comp.input_schema,
output_schema=comp.output_schema,
failure_strategy=node["data"].get("failure_strategy", comp.default_failure_strategy),
timeout_seconds=node["data"].get("timeout", comp.default_timeout),
)
return resolved
Component resolution uses the exact pinned version specified on the node, not the latest version. This guarantees that a pipeline deployed today produces the same execution plan six months from now, even if the component has been updated. Users opt into new versions explicitly through the UI.
Stage 3: Policy Overlay
This is where Cupel's hierarchical governance model is applied. The compiler merges policies from three levels โ organization, team, and pipeline โ following a strict "more restrictive only" rule:
def apply_policy_overlay(
resolved: dict[str, ResolvedComponent],
org_policies: PolicySet,
team_policies: PolicySet,
pipeline_policies: PolicySet,
) -> dict[str, ResolvedComponent]:
"""Apply hierarchical policy overlays to resolved components.
Policies merge downward: org -> team -> pipeline. Team policies
can only be MORE restrictive than org policies. Pipeline policies
can only be MORE restrictive than team policies.
For example, if the org requires minimum quality gate threshold of 95%,
the team can set 98%, and the pipeline can set 99%, but none can go below 95%.
Args:
resolved: Components from Stage 2.
org_policies: Organization-level baseline policies.
team_policies: Team-level override policies.
pipeline_policies: Pipeline-level override policies.
Returns:
Components with merged policy constraints applied.
"""
merged = merge_policies(org_policies, team_policies, pipeline_policies)
for node_id, component in resolved.items():
# Inject mandatory compliance steps if not already present
if merged.requires_pii_scan and component.component_type == "source":
component.execution_config["post_hooks"] = [
{"type": "pii_scan", "policy": merged.pii_policy}
]
# Enforce minimum quality thresholds
if component.component_type == "quality_gate":
configured_threshold = component.execution_config.get("threshold", 0)
minimum_threshold = merged.min_quality_threshold
component.execution_config["threshold"] = max(
configured_threshold, minimum_threshold
)
# Enforce encryption on destinations
if component.component_type == "destination":
component.execution_config["encryption"] = merged.encryption_standard
return resolved
This stage ensures that every pipeline, regardless of who builds it, complies with the organization's baseline security and compliance requirements.
Stage 4: Intermediate Representation
The compiler generates an intermediate representation (IR) that captures the execution plan as a data structure independent of both React Flow and Temporal:
@dataclass
class IRStage:
"""Single stage in the intermediate representation."""
stage_id: str
component: ResolvedComponent
dependencies: list[str] # stage_ids that must complete before this stage
retry_policy: RetryConfig
timeout: timedelta
condition: str | None # Optional entry condition expression
lineage_inputs: list[ColumnRef]
lineage_outputs: list[ColumnRef]
@dataclass
class PipelineIR:
"""Complete intermediate representation of a compiled pipeline."""
pipeline_id: str
stages: list[IRStage] # Topologically sorted
execution_order: list[list[str]] # Grouped by parallelism level
total_timeout: timedelta
lineage_graph: dict[ColumnRef, list[ColumnRef]]
The IR groups stages into parallelism levels. Stages within the same level have no dependencies on each other and can execute concurrently. Stages at level N+1 depend on at least one stage at level N:
Level 0: [source_a, source_b] # Execute in parallel
Level 1: [join_ab] # Depends on both sources
Level 2: [quality_gate, pii_scan] # Can run in parallel after join
Level 3: [gold_aggregation] # Depends on quality gate
Level 4: [destination_snowflake] # Final output
Stage 5: Temporal Code Generation
The final stage translates the IR into executable Temporal workflow Python code. Each IR stage becomes a Temporal activity invocation, and parallelism levels become asyncio.gather calls:
def generate_temporal_workflow(ir: PipelineIR) -> str:
"""Generate Temporal workflow Python code from the IR.
Produces a self-contained Python module defining a Temporal
workflow class and its activity stubs. The workflow mirrors
the IR execution order, with parallel stages wrapped in
asyncio.gather calls.
Args:
ir: The pipeline intermediate representation.
Returns:
Python source code string for the Temporal workflow.
"""
lines = [
"from temporalio import workflow, activity",
"from datetime import timedelta",
"import asyncio",
"",
f'@workflow.defn(name="pipeline-{ir.pipeline_id}")',
f"class Pipeline_{ir.pipeline_id.replace('-', '_')}:",
" @workflow.run",
" async def run(self, config: dict) -> dict:",
" results = {}",
]
for level in ir.execution_order:
if len(level) == 1:
# Single stage โ sequential execution
stage = ir.get_stage(level[0])
lines.extend(_generate_activity_call(stage))
else:
# Multiple stages โ parallel execution via gather
lines.append(" parallel_results = await asyncio.gather(")
for stage_id in level:
stage = ir.get_stage(stage_id)
lines.append(f" workflow.execute_activity(")
lines.append(f' run_{stage.component.component_type},')
lines.append(f" args=[config, results],")
lines.append(f" start_to_close_timeout=timedelta(seconds={stage.timeout.total_seconds()}),")
lines.append(f" ),")
lines.append(" )")
lines.append(" return results")
return "\n".join(lines)
The generated code is not meant to be hand-edited. It is a compilation artifact that the Temporal runtime executes. However, Cupel's dual editor shows this generated code in the code view, allowing advanced users to inspect and understand exactly what will execute when the pipeline runs.
Lineage as a Byproduct
Column-level lineage is captured during compilation, not computed after the fact. Each component in the registry declares its input and output column mappings. During Stage 4 (IR generation), the compiler traces column references through the graph and builds a complete lineage map:
def build_lineage_graph(
stages: list[IRStage],
edges: list[dict],
) -> dict[ColumnRef, list[ColumnRef]]:
"""Build column-level lineage from resolved component schemas.
Each component declares which input columns produce which output
columns. By tracing these declarations through the DAG, the compiler
constructs a complete lineage graph from source columns to
destination columns without any runtime instrumentation.
Returns:
Mapping of each output column to its source column(s).
"""
lineage = {}
for stage in stages:
for output_col in stage.lineage_outputs:
source_cols = trace_column_origin(
output_col, stage, stages, edges
)
lineage[output_col] = source_cols
return lineage
This lineage metadata is stored in PostgreSQL alongside the pipeline definition. The data catalog, impact analysis, and compliance audit features all consume this lineage data without any additional instrumentation overhead.
What This Means for Cupel Users
The pipeline compiler is the bridge between the visual experience and production execution. Pipeline builders work on the canvas without thinking about Temporal, retry policies, or workflow versioning. Platform architects can inspect the generated code to verify exactly what will execute. Compliance officers can trace column-level lineage from source to destination to satisfy regulatory requirements. And all three perspectives โ visual, code, and lineage โ are derived from the same compilation process, guaranteeing they are always consistent with each other.
Ready to build your data platform?
See how Cupel can streamline your data engineering workflows.
Explore Features