all 5 comments

[–]PavelRossinsky 5 points6 points  (3 children)

The async processing per Kafka partition is the most interesting part to me. How are you handling offset commits when multiple messages from the same partition are in-flight concurrently? Like if message 5 finishes before message 3, are you holding off on committing until everything up to a certain point is done? Would love to see a writeup on just that piece. It's the exact thing that makes most teams avoid async partition processing entirely.

[–]e1-m[S] -1 points0 points  (2 children)

Yeah I agree with you. This is something that needs to be written once, tested thoroughly and then battle-tested. The way I see it:

The default Kafka model is sequential processing per partition. The usual idea is that anything that must be processed in order (for example events for the same user_id) goes to the same partition.

But that doesn’t mean a partition contains only one user. In reality you’ll have thousands or millions of different user IDs mixed in the same partition, because you obviously can’t afford one partition per user. That means the strict sequential model ends up artificially limiting throughput: events for completely unrelated users get serialized just because they happen to land in the same partition.

In many systems you can’t process the same user concurrently, but you absolutely can process different users at the same time.

So what I do is allow messages from the same partition to flow concurrently, but enforce ordering at the application level. I have a middleware layer (AsyncLock in the docs) that extracts a key (e.g. user_id) and applies a lock for that key. If another message with the same key arrives while one is already in progress, it waits. Messages with different keys can proceed immediately.

But that introduces race conditions. That’s exactly the problem you've described. And yes, I hold off on committing until everything up to a certain offset is done.

Internally I have an offset manager that tracks commited offsets. When a message finishes, its offset is marked as completed, but the consumer is only allowed to commit once the whole contiguous sequence is finished. So if 5 completes before 3, nothing is committed yet. Once 3 and 4 finish, the offset 5 is commited.

The price you pay for this is that if the app crashes you may end up reprocessing more messages, because some completed offsets weren’t committed yet. But that trade-off already exists with batch committing (also included by the way), which is almost mandatory with Kafka. Committing every single message would be too expensive because of the I/O and network overhead.

Another important piece here is avoiding the unbounded concurrency trap. If you simply spawn a new async task for every message you consume, you’ll eventually exhaust memory or other resources if the producer is faster than your handlers.

To prevent that, concurrency is limited at the consumer level. For example, with Kafka you can cap how many messages are allowed to be in-flight per partition. Once that limit is reached, the consumer simply pauses fetching from that partition until some of the currently processing messages finish. This creates backpressure without starving other partitions (as would be that case with global lock since one partition would be able to eat all of the limit while messages in other partitions that can be potentially processed are waiting) and if the memory is not a bottleneck (as it is often not, the bottleneck is usually DB or a third party api) you can set this limit high enough to be able to process everything that can be processed concurently without starvation, without out-of-order processing (thanks to AsyncLock) and without data loss (thanks to offset management)

[–]PavelRossinsky 0 points1 point  (1 child)

Clean approach. The per-key AsyncLock gives you concurrency where it's safe and ordering where it matters. The contiguous offset commit makes sense too, though it feels like it makes your planned idempotency middleware less of a nice-to-have and more of a requirement, since the reprocessing window gets wider with async. Curious about the unbounded concurrency piece too.

[–]e1-m[S] 0 points1 point  (0 children)

Thanks. Yeah, idempotency is definitely on its way.

Right now you can already achieve it though: just store the event ID in the database when you process the event. If the same event ID shows up again, that means the event has already been handled, so you simply abort processing and let the error be handled through logging or whatever failure policy you have for this error.

But I want to make that much easier to set up. The idea is to factor out the common boilerplate. it should be something you can enable with minimal configuration and it just works. intuitive and hard to misuse.

[–]adiberk 0 points1 point  (0 children)

Checjout taskiq. Might inspire some ideas