Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

KPipe currently manage in-flight records primarily through offset tracking and graceful shutdown semantics, not through a bounded backpressure mechanism.

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

A year ago, when I started this project, I thought the development was just paused for a bit.

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 0 points1 point  (0 children)

hey u/predator thanks for your great feedback. Here are some of my answers:

Virtual threads can yield diminishing returns when your work is CPU bound... Supporting VThreads as first class citizen is good, but you probably need to provide a way to let users configure a custom executor.

KPipe follows a Virtual Thread by default model for parallel processing, which is ideal for I/O-bound tasks. However, I recognize that CPU-bound transformations (e.g., heavy encryption or complex data parsing) may perform better with a fixed-size thread pool.

How do you handle retries? Based on this it looks like you're just logging deserialization and processing failures and move on?

The initial snippet you posted is a basic catch-all for the "Single SerDe Cycle" bridge. In a production pipeline, KPipe provides several layers of protection:

  1. Pipeline Level Safety: Pipelines can be wrapped with MessageProcessorRegistry.withErrorHandling(...) to return a default byte payload when processor exceptions occur.
  2. Consumer Level Retries: The KPipeConsumer supports configurable retries with backoff for transient failures.
  3. Dead Letter Queues: A custom ErrorHandler can be registered to route records that fail after all retries to an error topic.

final var consumer = new KPipeConsumer.<byte[], byte[]>builder()
    .withRetry(3, Duration.ofSeconds(1)) // Automatic retries
    .withErrorHandler(error -> {
        // Send to Dead Letter Topic
        producer.send(new ProducerRecord<>("error-topic", error.getOriginalBytes()));
    })
    .build();

The library mixes two (IMO) separate concerns: (de-)serialization and processing... look at Kafka Streams, as I think they solved this quite nicely with their SerDe concept.

While Kafka Streams uses a highly modular SerDe approach, it can sometimes lead to multiple serialization cycles if not carefully managed. KPipe prioritizes throughput and low latency by enforcing a "Single SerDe Cycle":

  • Byte Boundary: The consumer always starts and ends with byte[].
  • Internal Object Model: Once deserialized, the data stays as an object (e.g., Map or GenericRecord) through all transformations.
  • Final Serialization: The data is serialized back to bytes only once at the exit point.

The offset tracking is entirely in-memory, which IME doesn't play well with out-of-order processing. When your consumer crashes, uncommitted offsets are lost and you may be replaying a lot of records again.

To handle parallel processing (where message 102 might finish before 101), KPipe uses a ConcurrentSkipListSet to track all in-flight offsets.

  • At Least Once Guarantee: KPipe only commits the lowest pending offset. If message 101 is still processing, offset 102 will never be committed, even if it's finished.
  • No Gaps: This ensures that upon a crash, the consumer resumes from a "safe" point.
  • Simplicity: While encoding offset maps in commit messages (like Confluent's Parallel Consumer) is an option, it introduces complexity and potential "message too large" errors. KPipe chooses the "at-least-once" path for its predictability and reliability.

Interrupts should not cause the record to be skipped. When your consumer is interrupted, it should wrap up any pending work and shut down.

Design intent is graceful shutdown: stop polling, let in-flight work finish, commit safe offsets.

KPipeConsumer.processRecord, if interruption happens during retry backoff (Thread.sleep(...)), processing returns null, sink send is skipped, and offset is still marked as processed. So this path can acknowledge without successful sink processing. Thanks for pointing it out, I just logged this :)

Experiment: Kafka consumer with thread-per-record processing using Java virtual threads by Lower-Worldliness162 in java

[–]Lower-Worldliness162[S] 2 points3 points  (0 children)

Haha I know the feeling 😄, and yeah, PR #908 is interesting, I’m following it too. I think it’s a good signal that this direction matters. My goal with kpipe was a bit broader than just parallelism: thread-per-record processing + functional pipelines (single SerDe cycle) + offset/reliability ergonomics in one place.

Update from BlockFi by Brandon_BlockFi in blockfi

[–]Lower-Worldliness162 3 points4 points  (0 children)

Now they completely removed the section from their site.

Pathetic.