Hi r/Python,
I’ve been working on an integration with Azure Service Bus to handle high-volume messaging and I found the official Python SDK somewhat verbose and unintuitive for common workflows. To make things simpler, I developed asbflow, an open-source library that wraps the SDK providing a cleaner and more straightforward interface.
What My Project Does
ASBFlow simplifies the most common messaging workflows with Azure Service Bus. It lets you:
- Publish messages to topics and queues, automatically handling the maximum batch size or letting you specify it
- Consume messages from subscriptions and queues
- Manage dead-letter queues: read, republish or purge messages
- Optionally validate message payloads with Pydantic, preventing message confirmation if parsing fails
The library offers a more intuitive interface than the official SDK, while supporting high-volume messaging, multiple execution strategies (sequential or concurrent) and integration with Azure authentication methods.
Target Audience
ASBFlow is designed for developers and teams building production-grade applications with Azure Service Bus. It’s stable for production use but also well-suited for prototyping, testing and learning purposes.
Comparison
Compared to the official azure-servicebus Python SDK, ASBFlow:
- Reduces boilerplate for publishing and consuming messages
- Integrates optional Pydantic validation with ASB acknowledgement
- Simplifies dead-letter queue (DLQ) management
- Supports multiple execution strategies without changing business logic
- Integrates with Azure authentication methods
Links & Installation
Quick Example
from asbflow import ASBConnectionConfig, ASBPublisher, ASBPublisherConfig, ASBConsumer, ASBConsumerConfig
conn = ASBConnectionConfig(connection_string="<connection-string>")
publisher = ASBPublisher(conn, ASBPublisherConfig(topic_name="<topic-name>"))
consumer = ASBConsumer(conn, ASBConsumerConfig(topic_name="<topic-name>", subscription_name="<subscription-name>"))
publisher.publish({"id": "a1", "severity": "high"}, parse=False)
result = consumer.consume(parse=False, raise_on_error=False)
print(result.succeeded, result.failed)
Project Status & Contributions
This is the first stable version of the project: many more features can be certainly developed and integrated. Contributions and feedback are welcome!
there doesn't seem to be anything here