71 lines
2.2 KiB
Python

from typing import Annotated
from uuid import UUID
from fastapi import Depends
from sqlalchemy.ext.asyncio.session import AsyncSession
from sqlmodel import select
from backend.app.models.house import House
from backend.app.models.user import User
from ..models.owner import Owner
from ..providers.db_provider import get_session
class OwnerRepository:
def __init__(self, session: Annotated[AsyncSession, Depends(get_session)]) -> None:
self.session = session
async def get_all(self, offset: int = 0, limit: int = 100):
statement = select(Owner).offset(offset).limit(limit)
result = await self.session.execute(statement)
return result.scalars().all()
async def get_by_id(self, owner_id: UUID):
statement = select(Owner).where(Owner.id == owner_id)
result = await self.session.execute(statement)
return result.scalar_one_or_none()
async def get_by_user_id(self, user_id: UUID):
statement = select(Owner).where(Owner.user_id == user_id)
result = await self.session.execute(statement)
return result.scalar_one_or_none()
async def get_details_by_house_id(self, house_id: UUID):
statement = (
select(Owner, User)
.join(User, Owner.user_id == User.id)
.join(House, House.owner_user_id == Owner.user_id)
.where(House.id == house_id)
)
result = await self.session.execute(statement)
row = result.first()
if row:
owner, user = row
return {
"owner": owner,
"user": user
}
return None
async def save(self, owner: Owner) -> None:
"""
Save a owner to the database. If an owner with that ID already exists, do an upsert.
"""
existing = await self.get_by_id(owner.id)
if not existing:
existing = owner
for key, value in owner.model_dump(exclude_unset=True).items():
setattr(owner, key, value)
try:
self.session.add(owner)
await self.session.commit()
await self.session.refresh(owner)
except Exception as e:
await self.session.rollback()
raise e