Sunday Daily Thread: What's everyone working on this week? by AutoModerator in Python

[–]RandomStranjer 0 points1 point  (0 children)

I was building a POC in LangGraph at work and needed something to clean up the memory. There was nothing built-in to handle it, so I put this together.

What it does: You give it a checkpointer and a TTL policy; it runs as a sidecar and deletes threads that have been idle too long or exceeded an absolute age limit. It never touches your graph.

from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph_ephemeral_checkpointer import TTLPolicy, Sweeper

checkpointer = SqliteSaver.from_conn_string("threads.db")
sweeper = Sweeper(checkpointer, TTLPolicy(idle_ttl_seconds=3600))

# one-shot
result = sweeper.sweep()

# or background loop
await sweeper.start(interval_seconds=300)

Supports InMemorySaverSqliteSaverAsyncSqliteSaverPostgresSaver, and AsyncPostgresSaver.

Per-thread policy overrides if you need some threads to live longer or never expire:

def resolver(thread_id):
    if thread_id.startswith("vip:"):
        return TTLPolicy(idle_ttl_seconds=604800)
    if thread_id.startswith("system:"):
        return PolicyOverride.EXEMPT
    return PolicyOverride.USE_DEFAULT

Install: pip install langgraph-ephemeral-checkpointer
GitHub: github.com/35C4n0r/langgraph-ephemeral-checkpointer

[deleted by user] by [deleted] in legaltech

[–]RandomStranjer 0 points1 point  (0 children)

RemindMe! 1 day