Files
controles/functions/decorators.py
Mateus Tavares 2b1668206d - inits centralizados, READMEs atualizados
- padronizando o nome de get_db_connection e session para get_db_session, para não confundir com session do Flask ou sessoes web

- corrigindo potenciais erros

-- has_permission nao consegue com lazy load carregar permission depois de load_user fechar a conexao, entao joinedLoad com Permission antes de fechar

-- db.rollback não existe caso db = get_db_session() apareça muito depois dentro do try, padronizando antes de try

--- comparar role por nivel (Role.SECRETARIO_GERAL) e nao por nome ("Secretario Geral")

- unificacao de get_otp_qr_code

- mudança de nowutc() para now(UTC) conforme novo padrão
2026-02-20 17:19:15 -03:00

193 lines
8.2 KiB
Python

from functools import wraps
from flask import session, redirect, url_for, flash
from flask_login import current_user, login_required
from sqlalchemy.orm import joinedload
from .database import get_db_session, Usuario, Role
from .rbac import Permission
def require_login(f):
"""Decorador para verificar se o usuário está logado"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Por favor, faça login para acessar esta página.', 'danger')
return redirect(url_for('auth.login'))
# Executar a função diretamente sem try/catch
return f(*args, **kwargs)
return decorated_function
def require_permission(permission_name):
"""Decorador para verificar se o usuário tem uma permissão específica"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
db = get_db_session()
try:
# Carregar o usuário com suas roles e permissões
user = db.query(Usuario).options(
joinedload(Usuario.roles).joinedload(Role.permissions),
joinedload(Usuario.militante),
joinedload(Usuario.cr),
joinedload(Usuario.setor),
joinedload(Usuario.celula)
).get(current_user.id)
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('auth.login'))
if not user.has_permission(permission_name):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
# Substituir o current_user pelo usuário carregado
setattr(current_user, '_get_current_object', lambda: user)
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator
def require_role(role_level):
"""Decorador para verificar se o usuário tem um papel específico"""
if not isinstance(role_level, int):
raise TypeError("require_role espera um nível numérico (int), use a classe Role.")
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
db = get_db_session()
try:
user = db.query(Usuario).get(current_user.id)
if not user or not user.has_role(role_level):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator
def require_minimum_role(min_level):
"""Decorador para verificar se o usuário tem um papel com nível mínimo"""
if not isinstance(min_level, int):
raise TypeError("require_minimum_role espera um nível numérico de role (int).")
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
db = get_db_session()
try:
user = db.query(Usuario).get(current_user.id)
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('auth.login'))
if not user.has_minimum_role(min_level):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator
def require_instance_permission(permission_name, instance_param):
"""Decorator para verificar se o usuário tem permissão em uma instância específica"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Por favor, faça login para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
# Obtém o ID da instância dos argumentos da função
instance_id = kwargs.get(instance_param)
if instance_id is None:
flash('ID da instância não encontrado.', 'error')
return redirect(url_for('home'))
if not current_user.has_instance_permission(permission_name, instance_id):
flash('Você não tem permissão para acessar esta instância.', 'error')
return redirect(url_for('home'))
return f(*args, **kwargs)
return decorated_function
return decorator
def require_instance_access(instance_type, instance_id):
"""Decorator para verificar se o usuário tem acesso a uma instância específica"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Por favor, faça login para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
db = get_db_session()
try:
user = db.query(Usuario).options(
joinedload(Usuario.roles).joinedload(Role.permissions)
).get(current_user.id)
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('auth.login'))
# Verificar acesso baseado na instância do usuário
if instance_type == 'celula':
if not (user.celula_id == instance_id or
user.has_permission(Permission.VIEW_SECTOR_REPORTS) or
user.has_permission(Permission.VIEW_CR_REPORTS) or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a esta célula.', 'error')
return redirect(url_for('index'))
elif instance_type == 'setor':
if not (user.setor_id == instance_id or
user.has_permission(Permission.VIEW_CR_REPORTS) or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este setor.', 'error')
return redirect(url_for('index'))
elif instance_type == 'cr':
if not (user.cr_id == instance_id or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este CR.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator