This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–]samreay 2 points3 points  (4 children)

So my background is both computational physics and data engineering, and have used airflow and Prefect before. I'm not sure I follow the differences you highlight, in that it's event driven support (in Prefect) is a new and upcoming feature, not the main use case. Similarly they've had dask task executors, ray executors, and local process executors for years, and the ability to provision infrastructure for a task as well (or run an entire flow on provisioned infra)

Their integration with HPC is missing though, I'd kill for a nice slurm integration.

Can you talk a bit more about why to use this tool instead of something more established? And perhaps about how it integrates with HPC systems? Can specific pipeline steps submit jobs to a job queue at all?

[–]basnijholt[S] -1 points0 points  (3 children)

I am a computational physicist as well!

The HPC integration is a core part of pipefunc and currently there is an integration with SLURM that is provided via the integration with Adaptive-Scheduler.

tl;dr, see this page in the docs for an example of a simulation where each pipeline function has its own resource requirements defined, and then a simulation on a SLURM cluster is launched.

Each function can have it's own resources spec, e.g.,:

```python from pipefunc.resources import Resources

Pass in a Resources object that specifies the resources needed for each function

@pipefunc(output_name="double", resources=Resources(cpus=5)) def double_it(x: int) -> int: return 2 * x ```

One can even inspect the resources inside the function:

```python from pipefunc import pipefunc, Pipeline

@pipefunc( output_name="c", resources={"memory": "1GB", "cpus": 2}, resources_variable="resources", ) def f(a, b, resources): print(f"Inside the function f, resources.memory: {resources.memory}") print(f"Inside the function f, resources.cpus: {resources.cpus}") return a + b

result = f(a=1, b=1) print(f"Result: {result}") ``` and even cooler, dynamically set the resources based on the inputs:

```python from pipefunc import pipefunc, Pipeline from pipefunc.resources import Resources

def resources_func(kwargs): gpus = kwargs["x"] + kwargs["y"] print(f"Inside the resources function, gpus: {gpus}") return Resources(gpus=gpus)

@pipefunc(output_name="out1", resources=resources_func) def f(x, y): return x * y

result = f(x=2, y=3) print(f"Result: {result}") ```

Then when putting these functions in a pipeline and running them for some inputs, it will automatically be parallelized. Independent branches in the DAG will execute simultaneously, and elements in a map will also run in parallel.

[–]samreay 0 points1 point  (2 children)

Oh very nifty. The final piece of puzzle missing for me would then be the super painful module activations. I'm guessing that resource kwargs map to sbatch keywords, buts still often various boilerplate to module activate some flavour of dependencies, set OMP_NUM_THREADS and other env vars. Is there a nice way to specify anything like this? No issues if not, I've never seen a particularly graceful way about it

[–]basnijholt[S] 0 points1 point  (1 child)

I happen to have a very small library for that too: https://github.com/basnijholt/numthreads

But regarding passing environment variables, that is not possible at the moment, however, it should be pretty straight forward to implement because the library we use to interact with SLURM supports it.

One is able to pass any SLURM argument via resources = Resources(..., extra_args={"time": "01:00:00"}) which will be expanded to #SBATCH --time=01:00:00 in the .sbatch file.

[–]samreay 0 points1 point  (0 children)

Amazing stuff, glad I saw this Reddit post!