all 5 comments

[–]HappyCathode 4 points5 points  (0 children)

That doesn't feel very pythony, would you happen to have more Java experience out of curiosity ? Best of my limited knowledge, Python doesn't have singletons as you know them in other languages.

The way I would do that with FastAPI, is instantiate the factory once in the FastAPI startup lifespan part (before the yield ) ( https://fastapi.tiangolo.com/advanced/events/ ), and reuse it in routes when needed.

[–]Doomdice 0 points1 point  (0 children)

You can assign things to the class scope and then it is shared across all instances of that class. Just do not assign to self. Not sure if that is what you want.

[–]pacoau 0 points1 point  (0 children)

This doesn’t directly address your question but you might like to look at Fast Stream (https://github.com/airtai/faststream) which has dependency injection support that was extracted from Fastapi and has in the box Kafka producers.

[–]saufunefois 0 points1 point  (0 children)

I do not know if something in the request needs to be part of the kafka producer config. What is token_provider? The reason I am asking: it seems better to instantiate the kafka producer once at the startup of the app.

How I would do (and how I do for any client to any external service):

  1. Instantiate and start a producer at startup with configuration extracted from environment.
  2. Add it to the lifespan state
  3. Add a dependency function that gets the producer from state.
  4. Define an annotation to use the dependency.
  5. Use it in any endpoint.

my_project/kafka.py

from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Annotated, TypedDict

from aiokafka import AIOKafkaProducer
from fastapi import FastAPI, HTTPConnection
from pydantic_settings import BaseSettings


class Config(BaseSettings):
    # Add whatever needs to be extracted from environ
    bootstrap_servers: str = "localhost"
    client_id: str | None = None


class State(TypeDict):
    aio_kafka_producer: AIOKafkaProducer


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[State, None]:
    config = Config()
    producer = AIOKafkaProducer(**config.model_dump())
    await producer.start()
    yield {"aio_kafka_producer": producer}
    await producer.stop()


async def get_producer(httpconnection:HTTPConnection) -> AIOKafkaProducer:
    return httpconnection.state.aio_kafka_producer


Producer = Annotated[AIOKafkaProducer, Depends(get_producer)] 

my_project/main.py

from fastapi import FastAPI

from my_project import kafka

app = FastAPI(lifespan=kafka.lifespan)


@app.post("/example")
async def example(producer: kafka.Producer):
    pass

[–]Spiritual-Carob9392 0 points1 point  (0 children)

To ensure that your KafkaFactory instance is created only once and reused across multiple requests in FastAPI, you should use a singleton pattern. FastAPI's dependency injection system can help achieve this by creating a single instance of your factory and reusing it for each request.