| |
| |
| |
| |
| |
| |
| |
|
|
| import sys |
| from typing import Optional, List, Dict, Any |
|
|
| |
| |
| |
| from fundaments import postgresql as db |
| |
|
|
|
|
| |
| try: |
| import asyncpg |
| except ImportError: |
| print("Error: The 'asyncpg' library is required. Please install it with 'pip install asyncpg'.") |
| sys.exit(1) |
|
|
|
|
| class AccessControl: |
| """ |
| Asynchronous class for managing access control. |
| It builds directly on the functions of your secure database module. |
| This is the layer that uses the foundation without modifying it. |
| |
| How to extend this class: |
| To add new functionality, simply create a new asynchronous method |
| within this class. This method can call the `db.execute_secured_query` |
| function to interact with the database. For example: |
| |
| async def get_user_last_login(self): |
| sql = "SELECT last_login FROM users WHERE id = $1" |
| result = await db.execute_secured_query(sql, self.user_id, fetch_method='fetchrow') |
| return result['last_login'] if result else None |
| """ |
| def __init__(self, user_id: Optional[int] = None): |
| self.user_id = user_id |
|
|
| async def has_permission(self, permission_name: str) -> bool: |
| """ |
| Checks if the user has a specific permission. |
| Uses the secure execute_secured_query function from the foundation. |
| """ |
| if self.user_id is None: |
| return False |
|
|
| |
| |
| |
| |
| |
| sql = """ |
| SELECT COUNT(*) AS count |
| FROM user_role_assignments ura |
| JOIN role_permissions rp ON ura.role_id = rp.role_id |
| JOIN user_permissions up ON rp.permission_id = up.id |
| WHERE ura.user_id = $1 AND up.name = $2 |
| """ |
| try: |
| result = await db.execute_secured_query( |
| sql, |
| self.user_id, |
| permission_name, |
| fetch_method='fetchrow' |
| ) |
| return result['count'] > 0 |
| except Exception as e: |
| |
| |
| raise Exception(f'Failed to check permission: {e}') |
|
|
| async def get_user_permissions(self) -> List[Dict[str, Any]]: |
| """Returns all permissions for a user.""" |
| if self.user_id is None: |
| return [] |
|
|
| |
| |
| |
| |
| sql = """ |
| SELECT DISTINCT up.name, up.description |
| FROM user_role_assignments ura |
| JOIN role_permissions rp ON ura.role_id = rp.role_id |
| JOIN user_permissions up ON rp.permission_id = up.id |
| WHERE ura.user_id = $1 |
| ORDER BY up.name |
| """ |
| try: |
| return await db.execute_secured_query(sql, self.user_id) |
| except Exception as e: |
| raise Exception(f'Failed to get user permissions: {e}') |
|
|
| async def get_user_roles(self) -> List[Dict[str, Any]]: |
| """Returns all roles for a user.""" |
| if self.user_id is None: |
| return [] |
|
|
| |
| |
| |
| |
| sql = """ |
| SELECT r.id, r.name, r.description |
| FROM user_role_assignments ura |
| JOIN user_roles r ON ura.role_id = r.id |
| WHERE ura.user_id = $1 |
| ORDER BY r.name |
| """ |
| try: |
| return await db.execute_secured_query(sql, self.user_id) |
| except Exception as e: |
| raise Exception(f'Failed to get user roles: {e}') |
|
|
| async def assign_role(self, role_id: int) -> None: |
| """Assigns a role to a user.""" |
| if self.user_id is None: |
| raise Exception('No user specified') |
|
|
| |
| |
| |
| sql = "INSERT INTO user_role_assignments (user_id, role_id) VALUES ($1, $2)" |
| try: |
| await db.execute_secured_query( |
| sql, |
| self.user_id, |
| role_id, |
| fetch_method='execute' |
| ) |
| except Exception as e: |
| raise Exception(f'Failed to assign role: {e}') |
|
|
| async def remove_role(self, role_id: int) -> None: |
| """Removes a role from a user.""" |
| if self.user_id is None: |
| raise Exception('No user specified') |
|
|
| |
| |
| |
| sql = "DELETE FROM user_role_assignments WHERE user_id = $1 AND role_id = $2" |
| try: |
| await db.execute_secured_query( |
| sql, |
| self.user_id, |
| role_id, |
| fetch_method='execute' |
| ) |
| except Exception as e: |
| raise Exception(f'Failed to remove role: {e}') |
|
|
| async def get_all_roles(self) -> List[Dict[str, Any]]: |
| """Returns all available roles.""" |
| |
| |
| sql = "SELECT id, name, description FROM user_roles ORDER BY name" |
| try: |
| return await db.execute_secured_query(sql) |
| except Exception as e: |
| raise Exception(f'Failed to get roles: {e}') |
|
|
| async def get_all_permissions(self) -> List[Dict[str, Any]]: |
| """Returns all available permissions.""" |
| |
| |
| sql = "SELECT id, name, description FROM user_permissions ORDER BY name" |
| try: |
| return await db.execute_secured_query(sql) |
| except Exception as e: |
| raise Exception(f'Failed to get permissions: {e}') |
|
|
| async def create_role(self, name: str, description: str) -> int: |
| """Creates a new role.""" |
| |
| |
| |
| sql = "INSERT INTO user_roles (name, description) VALUES ($1, $2) RETURNING id" |
| try: |
| result = await db.execute_secured_query( |
| sql, |
| name, |
| description, |
| fetch_method='fetchrow' |
| ) |
| return result['id'] |
| except Exception as e: |
| raise Exception(f'Failed to create role: {e}') |
|
|
| async def update_role_permissions(self, role_id: int, permission_ids: List[int]) -> None: |
| """Updates the permissions for a role.""" |
| |
| |
| |
| try: |
| |
| |
| sql_delete = "DELETE FROM role_permissions WHERE role_id = $1" |
| await db.execute_secured_query(sql_delete, role_id, fetch_method='execute') |
|
|
| |
| |
| |
| if permission_ids: |
| sql_insert = "INSERT INTO role_permissions (role_id, permission_id) VALUES ($1, $2)" |
| for permission_id in permission_ids: |
| await db.execute_secured_query( |
| sql_insert, |
| role_id, |
| permission_id, |
| fetch_method='execute' |
| ) |
| except Exception as e: |
| |
| |
| raise Exception(f'Failed to update role permissions: {e}') |
|
|
| async def get_role_permissions(self, role_id: int) -> List[Dict[str, Any]]: |
| """Returns all permissions for a role.""" |
| |
| |
| |
| sql = """ |
| SELECT p.id, p.name, p.description |
| FROM role_permissions rp |
| JOIN user_permissions p ON rp.permission_id = p.id |
| WHERE rp.role_id = $1 |
| ORDER BY p.name |
| """ |
| try: |
| return await db.execute_secured_query(sql, role_id) |
| except Exception as e: |
| raise Exception(f'Failed to get role permissions: {e}') |
|
|