Отказ от ответственности и извинительные слова. Прошло довольно много времени с тех пор, как я не задаю здесь вопросы, а также я полный новичок в FastAPI, так что... пожалуйста, не судите слишком строго.
Я играю с авторизацией FastAPI и задаюсь вопросом, как я могу защитить свои маршруты от пользователей, которые прошли аутентификацию, но не имеют разрешения для этих конкретных маршрутов.
Я написал код, который решает эту проблему.
Вот мои маршруты:
@router.get('/student_route_only')
@role_required(allowed_roles=[UserRole(name='student')])
async def student_route_only(
token: Annotated[str, Depends(oauth2_scheme)],
auth_service: AuthService = Depends(get_auth_service),
user_data: UserAuthResponse = None,
):
return UserAuthResponse(
user_id=user_data.user_id,
role=user_data.role,
name=user_data.name,
)
@router.get('/routes_for_student_and_admin')
@role_required(allowed_roles=[UserRole(name='student'), UserRole(name='admin')])
async def routes_for_student_and_admin(
token: Annotated[str, Depends(oauth2_scheme)],
auth_service: AuthService = Depends(get_auth_service),
user_data: UserAuthResponse = None,
):
return UserAuthResponse(
user_id=user_data.user_id,
role=user_data.role,
name=user_data.name,
)
Это моя пидантическая модель:
class UserAuthResponse(BaseModel):
user_id: int
role: str
name: str
class UserRole(BaseModel):
name: str
@validator('name')
def name_must_be_valid(cls, value):
allowed_roles = ['admin', 'student', 'teacher']
if value.lower() not in allowed_roles:
raise ValueError(
f"Invalid role. Allowed roles are: {', '.join(allowed_roles)}"
)
return value
и это мой декоратор, который выполняет эту работу:
def role_required(allowed_roles: list[UserRole]):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
auth_service = kwargs.get('auth_service')
token = kwargs.get('token')
user_data = await auth_service.get_current_user(token)
if not user_data or user_data.role not in [role.name for role in allowed_roles]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='This operation is forbidden for you',
)
kwargs['user_data'] = user_data # pushing gotten user_data back
return await func(*args, **kwargs)
return wrapper
return decorator
Я искал относительно простое решение для защиты ролевого маршрута в FastAPI, но не нашел чего-то, что было бы несложно реализовать.
Итак, я хотел бы спросить вас, можно ли использовать такой код в производстве?
Потому что я думаю, что его довольно просто использовать, например, если мне нужно защитить какой-то маршрут и сделать его доступным, скажем, только для администратора, я могу просто сделать это:
@router.get('/admin_route_only')
@role_required(allowed_roles=[UserRole(name='admin')]) # and that's it. The decorator does the rest of the job
async def admin_route_only(
token: Annotated[str, Depends(oauth2_scheme)],
auth_service: AuthService = Depends(get_auth_service),
user_data: UserAuthResponse = None,
):
return UserAuthResponse(
user_id=user_data.user_id,
role=user_data.role,
name=user_data.name,
)
С другой стороны, моему PyCharm не нравится, что auth_service
и token
в функции не используются (но они нужны в декораторе). Подойдет ли это другим разработчикам? А для линтеров?
А также можно ли таким образом делегировать авторизацию декоратору, а затем отправлять пользовательские данные обратно через kwargs..?
Заранее большое спасибо!
Для этой цели можно и удобно использовать Зависимости.
Вы можете реализовать зависимость как класс.
class Authorization:
def __init__(self, allowed_roles: list[UserRole]):
self.allowed_roles = allowed_roles
def __call__(
self,
token: : Annotated[str, Depends(oauth2_scheme),
auth_service: AuthService = Depends(get_auth_service),
):
user_data = await auth_service.get_current_user(token)
if not user_data or user_data.role not in [role.name for role in self.allowed_roles]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='This operation is forbidden for you',
)
return user_data
И затем ваши маршруты:
@router.get('/routes_for_student_and_admin')
async def routes_for_student_and_admin(
user_data: Annotated[UserAuthResponse, Depends(Authorization(allowed_roles=[UserRole(name='student')]))],
):
return user_data
Аналогичным образом реализуйте другие конечные точки с другим списком UserRoles. Вероятно, для запуска этого кода потребуются небольшие изменения. Но я надеюсь, что вы нашли подход!
Великолепно! Спасибо!
Отказ от ответственности: я не самый опытный человек, когда дело касается fastapi, и решение, предложенное Дмитрием, кажется, больше соответствует способу fastapi.
Тем не менее, если вы хотите придерживаться своего подхода с использованием декораторов, небольшая адаптация следующего кода должна помочь:
def require_permission(permission: str) -> Callable:
def decorator(func) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Callable:
try:
request = kwargs.pop("zz_permission_request",
next(arg for _, arg in kwargs.items() if isinstance(arg, Request)))
except StopIteration:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail='Permission check failed.',
)
auth_header = request.headers.get('Authorization')
scheme, _, token = auth_header.partition(' ')
if not auth_header or scheme.lower() != "bearer":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail = "Not authenticated",
headers = {"WWW-Authenticate": "Bearer"},
)
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM], verify=True)
token_data = TokenData.model_validate(payload)
available_permissions = set(token_data.permissions).union(resolve_roles(token_data.roles))
if not match_permission(permission, available_permissions):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail='missing permissions',
)
if inspect.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
# if the Request is not already part of the wrapped func signature we want to add it
sig = inspect.signature(func).parameters
if not any(True for param in sig.values() if param.annotation == Request):
wrapper.__signature__ = inspect.Signature(
parameters=[
# Use all parameters from func
*sig.values(),
Parameter('zz_permission_request', Parameter.KEYWORD_ONLY, annotation=Request)
],
return_annotation=inspect.signature(func).return_annotation,
)
return wrapper
return decorator
Это все еще очень грубо, но по сути это то, что при необходимости манипулирует исходной сигнатурой функции, чтобы fastapi внедрил запрос. Имея доступный запрос, вы можете извлечь токен и проверить свои разрешения.
FastAPI использует Starlette под капотом. Вы можете использовать встроенную
AuthenticationMiddleware
Аутентификацию Starlette