Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
The diff you're trying to view is too large. We only load the first 3000 changed files.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"git.ignoreLimitWarning": true
}
Binary file added backend/alembic/__pycache__/env.cpython-312.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
53 changes: 53 additions & 0 deletions backend/alembic/versions/a1b2c3d4e5f6_add_permissions_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""add permissions and role_permissions tables

Revision ID: a1b2c3d4e5f6
Revises: 09fac9965b5b
Create Date: 2026-04-15 10:00:00.000000

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = 'a1b2c3d4e5f6'
down_revision: Union[str, None] = '09fac9965b5b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('permissions',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('name', sa.String(length=50), nullable=False),
sa.Column('description', sa.String(length=255), nullable=True),
sa.Column('resource', sa.String(length=30), nullable=False),
sa.Column('action', sa.String(length=20), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_permissions_action'), 'permissions', ['action'], unique=False)
op.create_index(op.f('ix_permissions_name'), 'permissions', ['name'], unique=True)
op.create_index(op.f('ix_permissions_resource'), 'permissions', ['resource'], unique=False)

op.create_table('role_permissions',
sa.Column('role_id', sa.Integer(), nullable=False),
sa.Column('permission_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['permission_id'], ['permissions.id'], ),
sa.ForeignKeyConstraint(['role_id'], ['roles.id'], ),
sa.PrimaryKeyConstraint('role_id', 'permission_id'),
sa.UniqueConstraint('role_id', 'permission_id', name='uq_role_permission')
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('role_permissions')
op.drop_index(op.f('ix_permissions_resource'), table_name='permissions')
op.drop_index(op.f('ix_permissions_name'), table_name='permissions')
op.drop_index(op.f('ix_permissions_action'), table_name='permissions')
op.drop_table('permissions')
# ### end Alembic commands ###
Binary file added backend/app/__pycache__/__init__.cpython-312.pyc
Binary file not shown.
Binary file added backend/app/__pycache__/main.cpython-312.pyc
Binary file not shown.
Binary file added backend/app/api/__pycache__/deps.cpython-312.pyc
Binary file not shown.
Binary file added backend/app/api/__pycache__/routers.cpython-312.pyc
Binary file not shown.
75 changes: 75 additions & 0 deletions backend/app/api/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,81 @@ async def get_current_active_superuser(
return current_user


def require_permission(permission_name: str):
"""
Dependency factory for checking if a user has a specific permission.
Admin users have all permissions.
"""

async def permission_checker(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if not current_user.has_permission(permission_name):
logger.warning(
f"User {current_user.id} attempted action without permission: {permission_name}"
)
raise UserAccessError(error_type="insufficient_permissions")
return current_user

return permission_checker


def require_any_permission(permission_names: list[str]):
"""
Dependency factory for checking if a user has any of the specified permissions.
Admin users have all permissions.
"""

async def permission_checker(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if not current_user.has_any_permission(permission_names):
logger.warning(
f"User {current_user.id} attempted action without any of permissions: {permission_names}"
)
raise UserAccessError(error_type="insufficient_permissions")
return current_user

return permission_checker


def require_all_permissions(permission_names: list[str]):
"""
Dependency factory for checking if a user has all of the specified permissions.
Admin users have all permissions.
"""

async def permission_checker(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if not current_user.has_all_permissions(permission_names):
logger.warning(
f"User {current_user.id} attempted action without all permissions: {permission_names}"
)
raise UserAccessError(error_type="insufficient_permissions")
return current_user

return permission_checker


def require_role(role_name: str):
"""
Dependency factory for checking if a user has a specific role.
"""

async def role_checker(
current_user: Annotated[User, Depends(get_current_user)],
) -> User:
if current_user.role is None or current_user.role.name != role_name:
logger.warning(
f"User {current_user.id} attempted action without role: {role_name}"
)
raise UserAccessError(error_type="insufficient_permissions")
return current_user

return role_checker


async def pagination_params(skip: int = 0, limit: int = 100) -> PaginationParams:
"""
Get pagination parameters.
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
145 changes: 145 additions & 0 deletions backend/app/api/endpoints/permissions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""
Permission management endpoints.
"""

from fastapi import APIRouter, Depends, Request, status
from fastapi.responses import JSONResponse

from app.api.deps import (
CurrentSuperUser,
CurrentUser,
PaginationDep,
UnitOfWorkDep,
)
from app.core.exceptions import ValidationError
from app.models.schemas.role import (
PermissionCreate,
PermissionPublic,
PermissionUpdate,
)
from app.utils.response import create_response

router = APIRouter(prefix="/permissions", tags=["Permissions"])


@router.get("/", response_model=list[PermissionPublic])
async def get_permissions(
current_user: CurrentUser,
uow: UnitOfWorkDep,
pagination: PaginationDep,
) -> list[PermissionPublic]:
"""
Get all permissions with pagination.
Requires authentication.
"""
permissions = await uow.permissions.get_all(session=uow.session)
return [PermissionPublic.model_validate(perm) for perm in permissions]


@router.get("/by-resource/{resource}", response_model=list[PermissionPublic])
async def get_permissions_by_resource(
current_user: CurrentUser,
resource: str,
uow: UnitOfWorkDep,
) -> list[PermissionPublic]:
"""
Get permissions by resource.
Requires authentication.
"""
permissions = await uow.permissions.get_by_resource(session=uow.session, resource=resource)
return [PermissionPublic.model_validate(perm) for perm in permissions]


@router.get("/{permission_id}", response_model=PermissionPublic)
async def get_permission_by_id(
current_user: CurrentUser,
permission_id: int,
uow: UnitOfWorkDep,
) -> PermissionPublic:
"""
Get permission by ID.
Requires authentication.
"""
permission = await uow.permissions.get(session=uow.session, id=permission_id)
if not permission:
raise ValidationError("permission_not_found")
return PermissionPublic.model_validate(permission)


@router.post("/", response_model=PermissionPublic, status_code=status.HTTP_201_CREATED)
async def create_permission(
current_superuser: CurrentSuperUser,
permission_in: PermissionCreate,
uow: UnitOfWorkDep,
) -> PermissionPublic:
"""
Create a new permission.
Requires admin privileges.
"""
existing_permission = await uow.permissions.get_by_name(
session=uow.session,
name=permission_in.name,
)
if existing_permission:
raise ValidationError("permission_exists")

new_permission = await uow.permissions.create(
session=uow.session,
obj_in=permission_in,
)
return PermissionPublic.model_validate(new_permission)


@router.patch("/{permission_id}", response_model=PermissionPublic)
async def update_permission(
current_superuser: CurrentSuperUser,
permission_id: int,
permission_in: PermissionUpdate,
uow: UnitOfWorkDep,
) -> PermissionPublic:
"""
Update a permission.
Requires admin privileges.
"""
permission = await uow.permissions.get(session=uow.session, id=permission_id)
if not permission:
raise ValidationError("permission_not_found")

if permission_in.name and permission_in.name != permission.name:
existing_permission = await uow.permissions.get_by_name(
session=uow.session,
name=permission_in.name,
)
if existing_permission:
raise ValidationError("permission_exists")

updated_permission = await uow.permissions.update(
session=uow.session,
db_obj=permission,
obj_in=permission_in,
)
return PermissionPublic.model_validate(updated_permission)


@router.delete("/{permission_id}")
async def delete_permission(
current_superuser: CurrentSuperUser,
permission_id: int,
uow: UnitOfWorkDep,
request: Request,
) -> JSONResponse:
"""
Delete a permission.
Requires admin privileges.
"""
permission = await uow.permissions.get(session=uow.session, id=permission_id)
if not permission:
raise ValidationError("permission_not_found")

await uow.permissions.delete(session=uow.session, obj=permission)

return create_response(
status_code=status.HTTP_200_OK,
message=f"Permission {permission.name} deleted successfully",
request=request,
)
Loading