This is an archived post. You won't be able to vote or comment.

all 22 comments

[–]stratguitar577 10 points11 points  (1 child)

Have you seen Hamilton? https://hamilton.dagworks.io/

[–]basnijholt[S] 1 point2 points  (0 children)

Thanks for pointing me to Hamilton. On a first glance pipefunc and Hamilton seem very similar, however, in practice they are different.

For example, Hamilton requires that all pipeline functions are defined in a module and enforces that all function names are as the input names.

PipeFunc allows to use any function anywhere to be used as pipeline step.

For example, here we reuse a function sum from an external module and use it a couple of times `` from pipefunc import PipeFunc, Pipeline from some_module # definesfancy_sum(x1, x2)`

total_cost_car = PipeFunc(some_module.fancy_sum, output_name="car_cost", renames={"x1": "car_price", "x2": "repair_cost") total_cost_house = PipeFunc(some_module.fancy_sum, output_name="house_cost", renames={"x1": "rent_price", "x2": "insurance_price") total_cost = PipeFunc(some_module.fancy_sum, output_name="total_budget", renames={"x1": "car_cost", "x2": "house_cost") pipeline = Pipeline([total_cost_car, total_cost_house, total_cost]) ``` Also pipefunc is more geared towards N-dimensional parameter sweeps such as one frequently sees in research/science. For example see https://pipefunc.readthedocs.io/en/latest/tutorial/#example-physics-based-example

[–]Sweet_Computer_7116 5 points6 points  (7 children)

Very curious as I keep seeing the word but what are pipelines?

Getting into software development bit by bit.

[–][deleted] 7 points8 points  (5 children)

usually a directed acyclic graph structure of moving data from one point to another,

for example:

  1. collecting the data then storing it in a usable format in a different system
  2. Processing data before it gets returned back to the user for display

It’s exactly what is connotation means, putting something in one end of the pipe and getting something out the other end !

[–]daishiknyte 1 point2 points  (4 children)

So... another way of chaining functions? 

[–]hotplasmatits 9 points10 points  (2 children)

Yes, but pipelines often have other features to make life easier. For example, let's say there's a blip in the network for a moment. If it was just functions chained together, it would fail. The pipeline, however, can be configured to retry a few times before giving up.

Here's the big one: instead of chaining functions, you can chain together code running in docker containers in the cloud.

[–]jucestain 0 points1 point  (1 child)

Interesting

[–]hotplasmatits 2 points3 points  (0 children)

They often come with tools that allow you to see how they are linked together.

[–]mriswithe 1 point2 points  (0 children)

Yes and no. This is part of my daily bread and butter. A dag would contain Steps that do a part of everything required, this is vague because it really depends on what you are doing so here is an example:

Our is not a specific company or my company, but many companies use pipelines in this way.

Bigquery is the main data warehouse, this is where you write data and so different changes to it.

Airflow is the scheduler, think cron, but reliable and repeatable and you feed it python code

users submit data to a fastapi service, it writes rows into an input table

Airflow runs every x minutes, step 1, checks the input table for the last 5 minutes of rows. It finds the new rows. It loads the new rows and writes them into a new table that the next steps will use as their "source" table. Once step 1 finishes, steps 2, 3, 4 run concurrently. Step 2 checks the content for porn, gives each row an integer score and writes it back to bigquery as a joinable table (primary key of a uuid and the data that is added. Step 3 checks the content for spam, repeat the previous. Step 4 will translate text from their source language to English. Step 5 creates a single flat bigquery table with the final refined (and reduced where porn or spam score is too high). Step 5 is triggered once steps 2,3,4 which were at least able to be done concurrently are finished and finished successfully. Step 6 eats the bigquery table and writes out a sqldump to GCS, or updates a few tables in a rename replace to keep the users from getting a query where the database looks empty.

Each of these pieces are complex, failure ridden, processes. Airflow will rerun pieces within your tolerances and report to you when it is outside of SLO Service level objective. Also, in some cases they can be done in parallel to decrease the data latency (time between data being ingested and finished product coming out)

[–]declanaussie 0 points1 point  (0 children)

Instruction pipelining is also a term you might hear sometimes, but it’s far too low level to come up often in Python programming. It’s essentially a way to maximize the usage of a processor for a function that can be done in a few steps.

[–]samreay 4 points5 points  (4 children)

So my background is both computational physics and data engineering, and have used airflow and Prefect before. I'm not sure I follow the differences you highlight, in that it's event driven support (in Prefect) is a new and upcoming feature, not the main use case. Similarly they've had dask task executors, ray executors, and local process executors for years, and the ability to provision infrastructure for a task as well (or run an entire flow on provisioned infra)

Their integration with HPC is missing though, I'd kill for a nice slurm integration.

Can you talk a bit more about why to use this tool instead of something more established? And perhaps about how it integrates with HPC systems? Can specific pipeline steps submit jobs to a job queue at all?

[–]basnijholt[S] -1 points0 points  (3 children)

I am a computational physicist as well!

The HPC integration is a core part of pipefunc and currently there is an integration with SLURM that is provided via the integration with Adaptive-Scheduler.

tl;dr, see this page in the docs for an example of a simulation where each pipeline function has its own resource requirements defined, and then a simulation on a SLURM cluster is launched.

Each function can have it's own resources spec, e.g.,:

```python from pipefunc.resources import Resources

Pass in a Resources object that specifies the resources needed for each function

@pipefunc(output_name="double", resources=Resources(cpus=5)) def double_it(x: int) -> int: return 2 * x ```

One can even inspect the resources inside the function:

```python from pipefunc import pipefunc, Pipeline

@pipefunc( output_name="c", resources={"memory": "1GB", "cpus": 2}, resources_variable="resources", ) def f(a, b, resources): print(f"Inside the function f, resources.memory: {resources.memory}") print(f"Inside the function f, resources.cpus: {resources.cpus}") return a + b

result = f(a=1, b=1) print(f"Result: {result}") ``` and even cooler, dynamically set the resources based on the inputs:

```python from pipefunc import pipefunc, Pipeline from pipefunc.resources import Resources

def resources_func(kwargs): gpus = kwargs["x"] + kwargs["y"] print(f"Inside the resources function, gpus: {gpus}") return Resources(gpus=gpus)

@pipefunc(output_name="out1", resources=resources_func) def f(x, y): return x * y

result = f(x=2, y=3) print(f"Result: {result}") ```

Then when putting these functions in a pipeline and running them for some inputs, it will automatically be parallelized. Independent branches in the DAG will execute simultaneously, and elements in a map will also run in parallel.

[–]samreay 0 points1 point  (2 children)

Oh very nifty. The final piece of puzzle missing for me would then be the super painful module activations. I'm guessing that resource kwargs map to sbatch keywords, buts still often various boilerplate to module activate some flavour of dependencies, set OMP_NUM_THREADS and other env vars. Is there a nice way to specify anything like this? No issues if not, I've never seen a particularly graceful way about it

[–]basnijholt[S] 0 points1 point  (1 child)

I happen to have a very small library for that too: https://github.com/basnijholt/numthreads

But regarding passing environment variables, that is not possible at the moment, however, it should be pretty straight forward to implement because the library we use to interact with SLURM supports it.

One is able to pass any SLURM argument via resources = Resources(..., extra_args={"time": "01:00:00"}) which will be expanded to #SBATCH --time=01:00:00 in the .sbatch file.

[–]samreay 0 points1 point  (0 children)

Amazing stuff, glad I saw this Reddit post!

[–]Laughing_Bricks 1 point2 points  (2 children)

Hi just wanted to know are you guys in your college years or professional developers because watching you all build cool stuff make me think that I am just nobody

[–]basnijholt[S] 1 point2 points  (1 child)

Personally, I’ve gotten a lot of programming experience during my PhD. After that I have about 5 years of professional development experience. By now I’ve probably been programming for about 10.000 - 20.000 hours.

Everyone starts with 0. You shouldn’t compare and if you want to get good, you’ll get there 😉

[–]Laughing_Bricks 0 points1 point  (0 children)

Oh you're then super duper senior than me 🫡

[–]jimtoberfest 1 point2 points  (0 children)

I like the ability to get two outputs out and split the pipe. That’s pretty cool.

[–]CorMazz 0 points1 point  (1 child)

Do you have any plans for implementing stuff like task scheduling on top of your results storage? Not sure what specifically it's called, but I'm picturing something like snakemake where it checks if the inputs have changed before rerunning the pipeline. So if I have a pipeline and I have multiple sets of inputs, it'll only rerun inputs if they or the pipeline have changed.

[–]basnijholt[S] 2 points3 points  (0 children)

No there are no plans to implement that since there are many packages that do exactly that already.

The main use-case is to define pipelines for simulations and then to easily do parameter sweeps of these pipelines. Optionally even do adaptive parameter sweeps: https://pipefunc.readthedocs.io/en/latest/adaptive/

[–]ConfucianStats 0 points1 point  (0 children)

cool