Parsl
Quiet but deadly Python HPC workflow manager
2025-11-17 — 2025-11-21
Wherein Parsl is described as a Python‑native workflow engine whose DAG is built at runtime and whose Slurm provider is shown to enable scaling from laptop to cluster while methods for unwrapping worker errors are given.
I recently switched my primary workflow engine from Snakemake to Parsl (“Parallel Scripting Library”). Parsl isn’t heavily promoted, and it doesn’t have many sexy, hipster design patterns or much graphic design in its online presence. However, it turned out to solve many of my problems.
- Code is here: Parsl/parsl
- Docs are here: Parsl
Parsl originated in academia and the high-performance computing community and was motivated by a need for a scalable, flexible way to orchestrate complex scientific workflows while staying in the Python ecosystem. It aims to address the demands of “big data” and the plateau in sequential processing power with a focus on the needs of people in public research institutions, enabling researchers to scale from a laptop to a supercomputer with minimal changes to their code.
Parsl is designed to execute data-oriented workflows in parallel where possible and in serial where needed. Like Snakemake or Make, it manages dependencies in a DAG (Directed Acyclic Graph). However, Parsl has better affordances for exploratory methodologies, by virtue of being more dynamic-feeling. While Snakemake requires us to define the entire workflow upfront in a static file, Parsl builds the dependency graph implicitly and dynamically at runtime, entirely within Python.
1 The API
Snakemake uses its own DSL (Domain-Specific Language) to configure jobs. I hated that. It broke IDE support and linting, was an unnecessary abstraction over Python, and cemented my side-eyed distrust of DSLs.
Parsl avoids that by being pure python.
In Parsl, we define workflows by annotating standard Python functions with decorators. We call these annotated functions “Apps”.
import parsl
from parsl import python_app, bash_app
# A Parsl App that executes Python code
@python_app
def process_data(input_data):
# ... complex processing ...
return result
# A Parsl App that executes a shell command
@bash_app
def run_simulation(inputs, outputs):
return f"my-simulator --in {inputs[0]} --out {outputs[0]}"When we call a Parsl App, it doesn’t execute immediately. Instead, it returns a Future—an object representing the eventual result of the computation. This is a relatively modern approach in Python and is probably the ‘right’ way to do things.
If we pass the Future returned by one App as an argument to another App, Parsl automatically recognizes the dependency.
# Call the first app
future1 = process_data(initial_value)
# Call the second app, passing the future from the first
# Parsl knows this task depends on the completion of the first
future2 = process_data(future1)
# Execution happens asynchronously. We only block when we ask for the result.
print(future2.result())Parsl constructs the DAG from these implicit data flows. When all inputs (Futures) for a task are ready, Parsl schedules the task on available resources. This pure-Python approach feels intuitive, integrates well with modern development tools, and lets us write complex, dynamic logic that’s impossible in a static DSL.
2 How Cluster Execution Works
The main reason I tolerate workflow managers at all is that they handle the nightmare campus cluster horrors. Parsl has tight integration with batch schedulers like Slurm, PBS, SGE, and HTCondor, as well as with cloud providers and Kubernetes.
In Snakemake, we combine rules (with resource hints) and a separate YAML “profile” to map those hints to scheduler flags.
In Parsl, we handle configuration entirely within the Python script using a Config object. This object defines where and how tasks should run by combining Executors and Providers.
- Providers: These handle the interaction with the resource manager (e.g.,
SlurmProvider). They’re responsible for requesting, scaling, and terminating “blocks” of resources (like N nodes on a cluster). - Executors: These manage task execution on the resources acquired by a Provider. The
HighThroughputExecutor(HTEX) is commonly used for HPC scenarios, efficiently distributing tasks across many workers.
Here is a simple example of a configuration for a Slurm cluster:
from parsl import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import SlurmProvider
import os
# Get dynamic values from the environment
slurm_account = os.getenv("SLURM_ACCOUNT", "default_account")
partition = os.getenv("SLURM_PARTITION", "standard")
config = Config(
executors=[
HighThroughputExecutor(
label="my_hpc_cluster",
max_workers_per_node=48,
provider=SlurmProvider(
account=slurm_account,
partition=partition,
nodes_per_block=10, # Request 10 nodes per Slurm job
init_blocks=1,
max_blocks=5, # Scale up to 5 blocks (50 nodes total)
walltime="02:00:00",
# Optional: Add specific scheduler options
# scheduler_options="#SBATCH --gpus-per-node=4"
),
)
]
)
# Load the configuration before executing the workflow
# import parsl
# parsl.load(config)When the script runs, Parsl uses the SlurmProvider to submit sbatch jobs (the “blocks”). Once those jobs start, the HighThroughputExecutor connects to the allocated nodes and begins distributing the workflow tasks across them.
3 Python Configuration
In my experience with Snakemake (especially version 8+), configuring cluster execution was frustrating. Snakemake’s YAML-based executor profiles didn’t let us use environment variables.
This rigidity forced me to hard-code site-specific details, producing a proliferation of near-identical config files and terrible portability. Ultimately, I was generating YAML files programmatically just to work around the limitations.
Parsl’s configuration system is a massive improvement because it’s just Python. See the Parsl example above. We can use os.getenv() to dynamically pull the Slurm account or partition. We can use conditional logic, loops, and functions to construct the configuration object. We can easily integrate it with automatic config tools.
This flexibility is neat for writing portable workflows. We can define a single script that intelligently adapts to the environment it’s running in, whether that’s a local machine or a different HPC cluster, without the configuration-boilerplate nightmare that plagued my Snakemake setup.
4 Local execution
The oldest local executor is the ThreadPoolExecutor. Threads are almost always more trouble than they’re worth in Python, IMO. If I wanted to spend time debugging non-deterministic segfaults, I’d switch to writing C.
The HighThroughputExecutor, however, seems to support multiprocessing backends.
5 Providers
Parsl supports various Providers for public cloud backends (AWS, Google Cloud, Azure) and Kubernetes. Because the workflow logic (the Apps and their dependencies) is completely decoupled from the execution configuration (the Config object), running the same analysis on-prem or in the cloud often just means loading a different configuration object. It probably gets less seamless when I need to manage massive data assets, but I’ve managed to avoid that so far. TBC
6 Debugging Parsl workers
When Parsl tasks fail, the default error reporting is often opaque and unhelpful:
What’s happening:
- Parsl workers run in separate processes (via HTEX)
- When a worker crashes, the exception is wrapped in Parsl’s internal classes
- The actual Python traceback and error message are hidden inside the wrapper
- We only see “Dependency failure” without knowing the root cause
This is infuriating.
- No visibility into what actually went wrong
- We can’t distinguish between import errors, type errors, or logic bugs
- We have to manually inspect worker log files (if they exist)
- Errors propagate through dependency chains, hiding the original failure
What follows are some design patterns and code snippets to improve error visibility in Parsl workflows.
6.1 Error Extraction Wrapper
Add this function to our Parsl workflow file (e.g., parsl_task.py):
def unwrap_parsl_future(future, name: str):
"""Extract and surface exceptions from Parsl futures with full diagnostics.
Args:
future: Parsl AppFuture to unwrap
name: Descriptive name for logging (e.g., "build_tgt_abc123")
Returns:
Future result if successful
Raises:
Original exception with enhanced logging of remote stdout/stderr
"""
try:
return future.result()
except Exception as e:
import traceback
log.error(f"[{name}] FAILED in worker:")
log.error("".join(traceback.format_exception(type(e), e, e.__traceback__)))
# Dump remote debug info if available
if hasattr(e, 'stdout') and e.stdout:
log.error("---- WORKER STDOUT ----\n%s", e.stdout)
if hasattr(e, 'stderr') and e.stderr:
log.error("---- WORKER STDERR ----\n%s", e.stderr)
# Log exception attributes for debugging Parsl wrappers
log.error("Exception type: %s", type(e).__name__)
log.error("Exception attributes: %s", dir(e))
raiseThis wrapper function:
- Catches all exceptions raised by
.result()calls - Formats the full traceback (including nested causes via
__traceback__) - Extracts remote stdout and stderr if they’re attached to the exception
- Logs exception metadata to help debug the wrapper’s behaviour
- Re-raises the original exception so the workflow still fails
Wrap All Future.result() Calls
Before (opaque errors):
for i, (future, record) in enumerate(zip(run_futures, run_records), 1):
try:
future.result() # ❌ Hides worker exceptions
log.info(" [%d/%d] ✓ %s", i, len(run_futures), record['run_id'])
except Exception as e:
log.error(" [%d/%d] ✗ FAILED: %s", i, len(run_futures), record['run_id'])
log.error(" Error: %s", str(e)) # Only sees "Dependency failure"After (full visibility):
for i, (future, record) in enumerate(zip(run_futures, run_records), 1):
try:
name = f"{record['target_id']}_{record['sampler']}_{record['run_id']}"
unwrap_parsl_future(future, name) # ✅ Extracts full traceback
log.info(" [%d/%d] ✓ %s", i, len(run_futures), record['run_id'])
except Exception as e:
log.error(" [%d/%d] ✗ FAILED: %s", i, len(run_futures), record['run_id'])
log.error(" Error: %s", str(e))Now we can see:
- Exact line number where error occurred (
sample_cmd.py:251) - Full Python traceback with nested calls
- Actual exception type (
TypeError) - Specific error message (“Object of type ArrayImpl is not JSON serializable”)
The next few are bonus options that might also be helpful
6.2 Force worker logs to a visible directory
By default, Parsl workers may write logs to hidden or temp directories. We can force them to a known location:
In Parsl config YAML (config/parsl/local.yaml):
Benefits:
- All workers’ stdout/stderr in one place
- Easier to inspect logs when remote exceptions don’t capture everything
- Can
tail -fworker logs during execution
6.3 Instrument Entry Points
Add debug logging at the start of the Parsl app entry functions:
@python_app
def build_target_app(cfg_yaml, target_id, experiment):
"""Build a target via direct command call."""
import logging
log = logging.getLogger(__name__)
log.info("[WORKER START] build_target_app(target_id=%s)", target_id)
from lambda_hat.commands.build_cmd import build_entry
result = build_entry(cfg_yaml, target_id, experiment)
log.info("[WORKER END] build_target_app(target_id=%s)", target_id)
return resultWhat this catches:
- Import-time failures (if a worker can’t load modules)
- Crashes before any logging happens in the command
- Helps distinguish “worker started but crashed” vs “worker never started”
7 Parsl Worker Error Zoo
I extracted this list of failures from the logs of my recent Parsl runs and had the LLM summarise them.
7.1 Error: “Dependency failure for task N”
Meaning: This task depends on another task (via inputs=[...]) that failed.
Solution: 1. Look for earlier task failures in the logs 2. Use unwrap_parsl_future() to inspect the root cause 3. Fix the upstream task, not the dependent one
7.2 Error: “ModuleNotFoundError” in worker
Meaning: The worker can’t import a required module.
Common causes:
- Virtual environment not activated in the worker
- Missing
worker_initentry in executor config - Module installed in a different environment than the worker uses
Solution:
7.3 Error: “PickleError: Can’t pickle <object>”
Meaning: Parsl can’t serialize the function arguments or return value.
Common culprits: - Passing file handles, database connections, or threads - Returning non-serializable objects (e.g. JAX arrays can be tricky to serialize)
Solution: - Pass file paths (strings), not file objects - Convert JAX arrays to numpy before returning: return np.asarray(jax_array) - Use primitives (int, float, str, dict, list) for return values
7.4 Error: Worker crashes silently
Meaning: The worker process died without raising a Python exception.
Common causes: - Out of memory (OOM killer) - Segfault in a C extension (JAX, NumPy, etc.) - Signal received (SIGKILL, timeout)
Solution: 1. Check worker_logdir_root for stderr files 2. Look for “Killed” messages (OOM) 3. Add memory limits to the executor config 4. Run the task manually outside Parsl to reproduce
7.5 Worker Error Catching Variations
7.5.1 Variation 1: Accumulate Errors and Continue
Instead of failing immediately, we collect errors and report them at the end:
failures = []
for i, (future, record) in enumerate(zip(run_futures, run_records), 1):
try:
unwrap_parsl_future(future, record['run_id'])
log.info(" [%d/%d] ✓ %s", i, len(run_futures), record['run_id'])
except Exception as e:
log.error(" [%d/%d] ✗ FAILED: %s", i, len(run_futures), record['run_id'])
failures.append({'record': record, 'error': str(e)})
# Report all failures at end
if failures:
log.error("⚠ FAILURE SUMMARY: %d of %d runs failed", len(failures), len(run_futures))
for f in failures:
log.error(" • %s: %s", f['record']['run_id'], f['error'])7.5.2 Variation 2: Retry Failed Tasks
Wrap the extraction in retry logic:
def unwrap_with_retry(future, name: str, max_retries: int = 3):
"""Extract error and optionally retry on transient failures."""
for attempt in range(max_retries):
try:
return unwrap_parsl_future(future, name)
except (TimeoutError, ConnectionError) as e:
if attempt < max_retries - 1:
log.warning(f"[{name}] Retry {attempt+1}/{max_retries} after: {e}")
continue
raise7.5.3 Variation 3: Extract to Structured Log
For machine-readable error tracking, we extract errors to a structured log.
import json
def unwrap_to_json(future, name: str, output_path: Path):
"""Extract error and write structured JSON for analysis."""
try:
result = future.result()
output_path.write_text(json.dumps({
'name': name,
'status': 'success',
'result': str(result),
}))
return result
except Exception as e:
import traceback
output_path.write_text(json.dumps({
'name': name,
'status': 'failed',
'error_type': type(e).__name__,
'error_message': str(e),
'traceback': traceback.format_exception(type(e), e, e.__traceback__),
}, indent=2))
raise7.6 How this all works
7.7 Parsl’s Exception Wrapping Mechanism
When a Parsl worker raises an exception:
- Worker process: Exception occurs in the
@python_appfunction - Parsl serialization: Exception is pickled and sent back to the main process
- Parsl DataFlowKernel: Wraps the exception in
AppExceptionorDependencyError - Future.result(): Re-raises the wrapped exception
The problem: By default, we only see the wrapper (DependencyError), not the original exception.
Our solution:
- Call
.result()to unwrap it - Catch the exception and extract
__cause__and__traceback__ - Format it with
traceback.format_exception()to show the full chain - Check for Parsl-specific attributes, like
stdoutandstderr
With the future wrapper approach, we recover:
- ✅ Full exception traceback (all nested calls)
- ✅ Exception message and type
- ✅ Line numbers in source files
- ✅ Local variables (in traceback context)
- ✅ Chained exceptions (
__cause__,__context__)
We still need to do manual work to extract certain info:
- ❌ Worker’s stdout (unless we configure
worker_logdir_root)- Fix: Set
worker_logdir_rootin the executor config.
- Fix: Set
- ❌ Worker’s stderr (same)
- Fix: Same as above.
- ❌ Worker’s environment variables
- Fix: Log them in the worker entry point.
- ❌ Import-time errors (before the function runs)
- Fix: Add instrumentation at the start of the function.
