Role_required декоратор для маршрута FastAPI

Отказ от ответственности и извинительные слова. Прошло довольно много времени с тех пор, как я не задаю здесь вопросы, а также я полный новичок в FastAPI, так что... пожалуйста, не судите слишком строго.

Я играю с авторизацией FastAPI и задаюсь вопросом, как я могу защитить свои маршруты от пользователей, которые прошли аутентификацию, но не имеют разрешения для этих конкретных маршрутов.

Я написал код, который решает эту проблему.

Вот мои маршруты:

@router.get('/student_route_only')
@role_required(allowed_roles=[UserRole(name='student')])
async def student_route_only(
    token: Annotated[str, Depends(oauth2_scheme)],
    auth_service: AuthService = Depends(get_auth_service),
    user_data: UserAuthResponse = None,
):
    return UserAuthResponse(
        user_id=user_data.user_id,
        role=user_data.role,
        name=user_data.name,
    )


@router.get('/routes_for_student_and_admin')
@role_required(allowed_roles=[UserRole(name='student'), UserRole(name='admin')])
async def routes_for_student_and_admin(
    token: Annotated[str, Depends(oauth2_scheme)],
    auth_service: AuthService = Depends(get_auth_service),
    user_data: UserAuthResponse = None,
):
    return UserAuthResponse(
        user_id=user_data.user_id,
        role=user_data.role,
        name=user_data.name,
    )

Это моя пидантическая модель:

class UserAuthResponse(BaseModel):
    user_id: int
    role: str
    name: str

class UserRole(BaseModel):
    name: str

    @validator('name')
    def name_must_be_valid(cls, value):
        allowed_roles = ['admin', 'student', 'teacher']
        if value.lower() not in allowed_roles:
            raise ValueError(
                f"Invalid role. Allowed roles are: {', '.join(allowed_roles)}"
            )
        return value

и это мой декоратор, который выполняет эту работу:

def role_required(allowed_roles: list[UserRole]):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            auth_service = kwargs.get('auth_service')
            token = kwargs.get('token')
            user_data = await auth_service.get_current_user(token)
            if not user_data or user_data.role not in [role.name for role in allowed_roles]:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail='This operation is forbidden for you',
                )
            kwargs['user_data'] = user_data  # pushing gotten user_data back
            return await func(*args, **kwargs)

        return wrapper

    return decorator

Я искал относительно простое решение для защиты ролевого маршрута в FastAPI, но не нашел чего-то, что было бы несложно реализовать.

Итак, я хотел бы спросить вас, можно ли использовать такой код в производстве?

Потому что я думаю, что его довольно просто использовать, например, если мне нужно защитить какой-то маршрут и сделать его доступным, скажем, только для администратора, я могу просто сделать это:

@router.get('/admin_route_only')
@role_required(allowed_roles=[UserRole(name='admin')])  # and that's it. The decorator does the rest of the job
async def admin_route_only(
    token: Annotated[str, Depends(oauth2_scheme)],
    auth_service: AuthService = Depends(get_auth_service),
    user_data: UserAuthResponse = None,
):
    return UserAuthResponse(
        user_id=user_data.user_id,
        role=user_data.role,
        name=user_data.name,
    )

С другой стороны, моему PyCharm не нравится, что auth_service и token в функции не используются (но они нужны в декораторе). Подойдет ли это другим разработчикам? А для линтеров?

А также можно ли таким образом делегировать авторизацию декоратору, а затем отправлять пользовательские данные обратно через kwargs..?

Заранее большое спасибо!

FastAPI использует Starlette под капотом. Вы можете использовать встроенную AuthenticationMiddlewareАутентификацию Starlette

PGHE 15.08.2024 02:02
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
1
84
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Для этой цели можно и удобно использовать Зависимости.

Вы можете реализовать зависимость как класс.

class Authorization:

    def __init__(self, allowed_roles: list[UserRole]):
        self.allowed_roles = allowed_roles

    def __call__(
        self,
        token: : Annotated[str, Depends(oauth2_scheme),
        auth_service: AuthService = Depends(get_auth_service),
    ):
        user_data = await auth_service.get_current_user(token)
        if not user_data or user_data.role not in [role.name for role in self.allowed_roles]:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail='This operation is forbidden for you',
            )

        return user_data

И затем ваши маршруты:

@router.get('/routes_for_student_and_admin')
async def routes_for_student_and_admin(
    user_data: Annotated[UserAuthResponse, Depends(Authorization(allowed_roles=[UserRole(name='student')]))],
):
    return user_data

Аналогичным образом реализуйте другие конечные точки с другим списком UserRoles. Вероятно, для запуска этого кода потребуются небольшие изменения. Но я надеюсь, что вы нашли подход!

Великолепно! Спасибо!

John 20.08.2024 12:06

Отказ от ответственности: я не самый опытный человек, когда дело касается fastapi, и решение, предложенное Дмитрием, кажется, больше соответствует способу fastapi.

Тем не менее, если вы хотите придерживаться своего подхода с использованием декораторов, небольшая адаптация следующего кода должна помочь:

def require_permission(permission: str) -> Callable:
    def decorator(func) -> Callable:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> Callable:
            try:
                request = kwargs.pop("zz_permission_request",
                                 next(arg for _, arg in kwargs.items() if isinstance(arg, Request)))
            except StopIteration:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail='Permission check failed.',
                )
            auth_header = request.headers.get('Authorization')
            scheme, _, token = auth_header.partition(' ')
            if not auth_header or scheme.lower() != "bearer":
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail = "Not authenticated",
                    headers = {"WWW-Authenticate": "Bearer"},
                )
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM], verify=True)
            token_data = TokenData.model_validate(payload)
            available_permissions = set(token_data.permissions).union(resolve_roles(token_data.roles))
            if not match_permission(permission, available_permissions):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail='missing permissions',
                )
            if inspect.iscoroutinefunction(func):
                return await func(*args, **kwargs)
            else:
                return func(*args, **kwargs)

        # if the Request is not already part of the wrapped func signature we want to add it
        sig = inspect.signature(func).parameters
        if not any(True for param in sig.values() if param.annotation == Request):
            wrapper.__signature__ = inspect.Signature(
                parameters=[
                    # Use all parameters from func
                    *sig.values(),
                    Parameter('zz_permission_request', Parameter.KEYWORD_ONLY, annotation=Request)
                ],
                return_annotation=inspect.signature(func).return_annotation,
                )
        return wrapper
    return decorator

Это все еще очень грубо, но по сути это то, что при необходимости манипулирует исходной сигнатурой функции, чтобы fastapi внедрил запрос. Имея доступный запрос, вы можете извлечь токен и проверить свои разрешения.

Другие вопросы по теме

Проблема несоответствия пароля с bcryptjs в Angular, NodeJS и MySQL
Укажите срок действия при запросе токена доступа при входе в Microsoft oauth2 v2.0
Кто-нибудь знает, почему моя переменная (x) изменила свое значение с 3 на 0 (1 после x++) в этой программе? Я пытаюсь создать систему регистрации, но она перезаписывает
Можете ли вы создать «Потоки пользователей» в Azure с помощью учетной записи с оплатой по мере использования?
Я пытаюсь создать систему входа в систему на языке C, которая скрывает пароль с помощью '*' при вводе и возвращается назад, когда пользователь вводит неверную информацию
Oauth2-proxy `/oauth2/auth` возвращает 401 для действительных токенов JWT
Реализация криптохэша – Node.js 22 – Angular 18
При попытке аутентификации Firebase мне сообщается, что сначала мне нужно пройти аутентификацию Firebase. Хм?
Измените конфигурацию для регистрации приложения, чем для корпоративного приложения
Аутентификация Windows не работает через туннель разработки