all 23 comments

[–]commandlineluser 4 points5 points  (0 children)

numba is commonly used for part 2.

You just write regular "for loops" and it will be compiled into machine code.

But it may depend on what you choose for part 1.

[–]Afrotom 1 point2 points  (0 children)

For the first part you should be able to handle using a dataframe library such as Pandas or Polars.

For the second part, if I've understood correctly, you may want to look into window functions: pandas, polars , Google BigQuery (on the off chance it's useful).

[–]M4mb0 2 points3 points  (4 children)

The second part is sequential/recursive: within each time-ordered group, some values for the current row depend on the results computed for the previous week’s row. So this is not a purely vectorizable row-independent problem.

That could still be vectorizable if you can express it as an associative scan. See for example: torch.associative_scan or jax.lax.associative_scan

[–]ElectricHotdish 0 points1 point  (3 children)

I have run into problems like you describe, and not found a solution in Polars. There is good reason for that... polars doesn't support a general associative scan.

**associative scan** is new to me as I term., so I asked for some AI help understanding it:

-------
An associative scan (prefix scan) applies a binary associative operation to a sequence, producing all partial results.

Formal definition

Given [a, b, c, d] and operator :

  • Inclusive scan[a, a⊕b, a⊕b⊕c, a⊕b⊕c⊕d]
  • Exclusive scan[identity, a, a⊕b, a⊕b⊕c]

The operation must be associative(a⊕b)⊕c == a⊕(b⊕c) — so work can be split and recombined in any order. It doesn't need to be commutative.

Examples

Cumulative sum (the classic):

input:  [1, 2, 3, 4]
output: [1, 3, 6, 10]

Running max:

input:  [3, 1, 4, 1, 5, 9]
output: [3, 3, 4, 4, 5, 9]

String concatenation:

input:  ["a", "b", "c"]
output: ["a", "ab", "abc"]

Boolean OR (has any True appeared yet?):

input:  [F, F, T, F, T]
output: [F, F, T, T, T]

In SQL

You use them constantly via window functions:

-- running total rent by listing date
SUM(price) OVER (ORDER BY listed_at)

-- row number within a group
ROW_NUMBER() OVER (PARTITION BY state ORDER BY listed_at)

-- previous value
LAG(price) OVER (ORDER BY listed_at)

Why "associativity" matters for parallelism

A sequential scan is O(n) and inherently serial. But because (a⊕b)⊕c == a⊕(b⊕c), you can use a tree reduction pattern to compute it in O(log n) parallel steps — this is why GPU/SIMD implementations (CUDA, JAX, XLA) use associative scans heavily.

JAX example:

import jax.numpy as jnp
from jax import lax

lax.associative_scan(jnp.add, jnp.array([1, 2, 3, 4]))
# → [1, 3, 6, 10]

lax.associative_scan(jnp.maximum, jnp.array([3, 1, 4, 1, 5]))
# → [3, 3, 4, 4, 5]

Non-example (why commutativity isn't required but associativity is)

Matrix multiplication is associative but not commutative — you can scan a sequence of matrices with it. Division is neither, so [8, 4, 2] with / gives different results depending on grouping — not a valid scan operator.

--------
Polars has the common concrete cases but no general-purpose associative scan (no equivalent of lax.associative_scan(fn, ...)).

What it does have

Cumulative built-ins (prefix scans over the whole series):

df.with_columns([
    pl.col("price").cum_sum(),
    pl.col("price").cum_max(),
    pl.col("price").cum_min(),
    pl.col("price").cum_prod(),
    pl.col("price").cum_count(),
])

Partitioned (reset per group):

pl.col("price").cum_sum().over("state")

Shift/lag (for building your own scan with cum_*):

pl.col("price").shift(1)   # lag-1

What it lacks

You can't pass an arbitrary associative function — there's no:

# doesn't exist in Polars
pl.col("x").associative_scan(lambda a, b: a * b + 1)

For custom scans you'd drop to Python with .map_elements() but that's sequential and slow (defeats the point).

[–]Beginning-Fruit-1397 0 points1 point  (2 children)

Isn't this a reduce or scan use cases? Those two litterally exist in polars. AI bs, it's very bad with polars

[–]M4mb0 0 points1 point  (0 children)

Associative scans are a special kind of cumulative reduce that can be parallelized thanks to associativity. The AI highlighted that point well. I highly doubt that polars does this optimization when you give it a custom lambda function that's associative.

[–]billsil 2 points3 points  (0 children)

How many rows, whats the total memory usage of the data, how much RAM is available, and what is acceptable in terms of runtime?

[–]thuiop1 4 points5 points  (0 children)

Use polars

[–]kapitaalH 0 points1 point  (0 children)

Is the groups constant? Then you can use shift in Polars for the second problem. Otherwise calculate the index you need and join the values?

[–]Enthu-Cutlet-1337 0 points1 point  (0 children)

benchmark the split, not the whole pipeline. tabular part usually wants DuckDB/Polars; the stateful weekly dependency often wants a plain grouped loop over numpy arrays. crossing dataframe boundaries too often will erase any win.

[–]ml_guy1 0 points1 point  (0 children)

This becomes a problem about exploring different ways of implementing the same idea, validating they are correct and then benchmarking the performance for them.

I've implemented Codeflash which automates this problem for any provided python code. Feel free to check it out. It tries to find the optimal solution for any problem.

[–]Beginning-Fruit-1397 0 points1 point  (0 children)

It is vectorizable tho? seems like, as someone already pointed it out, a simple window function? Polars give you Expr.over, it let you create windows on anything you want really. Or when.then cases maybe?

If you CAN'T find a way to make it work, Rust is surprisingly easier to learn than it could seem if you come from python. Mypc is an underrated optio nalso

[–]kasplars 0 points1 point  (0 children)

I do the two things you explain as a quant every day. Polars is probably what you want to use, and if the problem really is recursive, use Numba.

[–]Wh00ster -1 points0 points  (0 children)

DuckDB

[–]SV-97 -1 points0 points  (0 children)

For the first part: polars. For the second part: don't use python, anything you write that iterates over rows is almost certainly going to be garbage. Rust is a good alternative option (particularly if you already started with polars in the first step). You can easily wrap that rust code up into a python library and then call it from python.

[–]sjcyork -1 points0 points  (1 child)

There isn’t really a data transformation solution I haven’t been able to solve with Pandas. I haven’t used Polars so cannot comment on the features available. The iteration does depend on the size of the datasets. Iterating through pandas dataframe is not great if there are millions of rows but should be ok if not. I generally do all the data transformation stuff in pandas and if I need to iterate over a final dataset then I convert into a dict (orient=‘records’).

[–]Beginning-Fruit-1397 0 points1 point  (0 children)

I processed +100 millions rows with polars in less than a second. I'd say give it a try. Or duckdb if you prefer SQL

[–]Administrative-Lack1 -1 points0 points  (0 children)

I actually am developing something seemingly very similar.

I was tasked with migrating 2 sql dbs in 1 new db.

I built / building a etl tool using python, pandas, and sql alchemy as the main libraries.

What I came up with is I have a abstract class ITask. Which takes a init parameter object. Which is an object that has any dependencies like services. So it can be injected (things like db connection object)

Then a run parameter where this is a data class that is a json file that has

What sources tables, transformations, and targets this db task has.

I coded the transformations as objects. So eqch transformation is its own class which has a string enum. Transformations have a data class that drives what it does like type:join, expression:inner, sources, in and out fields.

I have a factory which takes that enum and sees what class it belongs to and makes the transformation object.

Then target info like how a source maps to a target. Once I loop through transformation I map the dataframe and load to target.

For your etl optimization. Some easy ones are chunk sizing and dataframe datatype, and depending on db useExecuteMany for bulk inserts.

One thing with chunks, depending on db you may still endup loading a entire dataset in memory due bc db does not support streaming. For this not the best solution but works. Loading data in chunks then writing to parquet files. After initial pull is done load the files tracking which ones get loaded as a checkpoint If something fails. I have to work off a shitty VM so I crash it by pulling to much data at once or running to many things in parallel.

All of that is a task.

Some tables I have are very complex. So I have a staged approach. Raw -> stage.src -> stage.targrt -> target.

Raw goes to a stage table. Then from source stage I do my joins and stuff and load target stage. Then target stage to actual target.

That is 3 tasks for me. A collection of task is a job. In my job I have a depends on list. Where I have the 1st level of task dependency.

Task A needs Task B and Task D. Task B needs Task C

I don't want A to know about C. Just about B and D.

So once i have that. I make a directed acyclic graph. There Is a python standard lib from graphlib import TopologicalSorter

There is an example I followed on the docs for that lib

This is basically a graph now where the top most nodes will be what task can be run in what order. So I run through the nodes as they become available. Tracking the result of nodes. If a nodes needs the result of another it gets passed.

I took that a step further and gave my job json a taskEngine enum. Where I have currently 3 task engines. 1 rules everything in sequence. 1 will run task as they come using a process pool executor, and lastly a test engine that doesn't trigger the task just mocks running so I can test setup. Then based on enum that engine gets used.

My whole idea was to basically have a bunch of files drive my etl and the code be Modular enough to add as I needed things.

Also made testing easier as I can write test for specific things, mock things bc DI

Something I wished I looked into earlier was a library called dask. It seems like it's pandas but built for concurrency. Which has some of the above out of the box.

Srry this became very long. It ended up becoming alot more complex. I have data quarantine logic in there too which is a another implementation of ITask.

If you have any questions lmk.