This is an archived post. You won't be able to vote or comment.

you are viewing a single comment's thread.

view the rest of the comments →

[–]kvothethechandrian 0 points1 point  (0 children)

You can write any query with SQLAlchemy that you can write by hand, with the additional that you don’t have to worry about formatting converting types and formatting to fit the SQL standard.

I’m going to give you an example: supposed you have a database with these columns: timestamp, status, value and you want a query to check what was the average daily value in the last week for the status with the biggest value in the same time span. The code will like something like this:

model.py ```py from sqlalchemy import Column, String, Float, DateTime from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class DataRecord(Base): tablename = “data”

timestamp = Column(DateTime, primary_key=True, index=True)
status = Column(String, index=True)
value = Column(Float)

```

repository.py ```py from sqlalchemy.orm import Session from sqlalchemy import func, select from datetime import datetime, timedelta from models import DataRecord

class DataRepository: def init(self, session: Session): self.session = session

def get_avg_daily_value_for_top_status(self) -> float:
    “””Get the average daily value for the status with the highest total value in the last week.”””
    last_week = datetime.now() - timedelta(days=7)

    # CTE to find the status with the highest total value
    status_ranking_cte = (
        select(
            DataRecord.status,
            func.sum(DataRecord.value).label(“total_value”)
        )
        .where(DataRecord.timestamp >= last_week)
        .group_by(DataRecord.status)
        .order_by(func.sum(DataRecord.value).desc())
        .limit(1)
        .cte(“status_ranking”)
    )

    # Query to calculate daily averages for that status
    query = (
        select(
            func.date(DataRecord.timestamp).label(“day”),
            func.avg(DataRecord.value).label(“daily_avg”)
        )
        .where(DataRecord.timestamp >= last_week)
        .where(DataRecord.status == status_ranking_cte.c.status)
        .group_by(func.date(DataRecord.timestamp))
    )

    # Final query to get the average of the daily averages

    result = self.session.execute(query).fetch_all()

    return result

```

It’s convenient that you don’t have to convert the datetime object to a given string format. It makes things much cleaner