2025-02-19 16:25:48 +01:00

43 lines
1.4 KiB
Python

from sqlalchemy.ext.asyncio.session import AsyncSession
from typing import Annotated
from fastapi import Depends
from ..providers.db_provider import get_session
from ..models.owner import Owner
from sqlmodel import select
from uuid import UUID
class OwnerRepository:
def __init__(self, session: Annotated[AsyncSession, Depends(get_session)]) -> None:
self.session = session
async def get_all(self, offset: int = 0, limit: int = 100):
statement = select(Owner).offset(offset).limit(limit)
result = await self.session.execute(statement)
return result.scalars().all()
async def get_by_id(self, owner_id: UUID):
statement = select(Owner).where(Owner.id == owner_id)
result = await self.session.execute(statement)
return result.scalar_one_or_none()
async def save(self, owner: Owner) -> None:
"""
Save a owner to the database. If an owner with that ID already exists, do an upsert.
"""
existing = await self.get_by_id(owner.id)
if not existing:
existing = owner
for key, value in owner.model_dump(exclude_unset=True).items():
setattr(owner, key, value)
try:
self.session.add(owner)
await self.session.commit()
await self.session.refresh(owner)
except Exception as e:
await self.session.rollback()
raise e