2025-04-03 15:58:07 -03:00
|
|
|
from functools import wraps
|
|
|
|
|
from flask import session, redirect, url_for, flash
|
|
|
|
|
from flask_login import current_user, login_required
|
|
|
|
|
from sqlalchemy.orm import joinedload
|
2025-04-13 22:30:05 -03:00
|
|
|
from .database import get_db_connection, Usuario, Role
|
2025-04-03 15:58:07 -03:00
|
|
|
from .rbac import Permission
|
|
|
|
|
|
|
|
|
|
def require_login(f):
|
|
|
|
|
"""Decorador para verificar se o usuário está logado"""
|
|
|
|
|
@wraps(f)
|
|
|
|
|
def decorated_function(*args, **kwargs):
|
|
|
|
|
if not current_user.is_authenticated:
|
2025-04-03 20:58:02 -03:00
|
|
|
flash('Por favor, faça login para acessar esta página.', 'danger')
|
2025-04-03 15:58:07 -03:00
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
|
|
|
|
db = get_db_connection()
|
|
|
|
|
try:
|
2025-04-13 22:30:05 -03:00
|
|
|
# Carregar o usuário com suas roles e permissões
|
2025-04-03 15:58:07 -03:00
|
|
|
user = db.query(Usuario).options(
|
2025-04-13 22:30:05 -03:00
|
|
|
joinedload(Usuario.roles).joinedload(Role.permissions),
|
|
|
|
|
joinedload(Usuario.militante),
|
|
|
|
|
joinedload(Usuario.cr),
|
|
|
|
|
joinedload(Usuario.setor),
|
|
|
|
|
joinedload(Usuario.celula)
|
2025-04-03 15:58:07 -03:00
|
|
|
).get(current_user.id)
|
|
|
|
|
|
|
|
|
|
if not user:
|
2025-04-03 20:58:02 -03:00
|
|
|
flash('Usuário não encontrado.', 'danger')
|
2025-04-03 15:58:07 -03:00
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
|
|
|
|
# Atualiza timestamp da última atividade
|
|
|
|
|
user.update_last_activity()
|
|
|
|
|
db.commit()
|
|
|
|
|
|
2025-04-13 22:30:05 -03:00
|
|
|
# Substituir o current_user pelo usuário carregado
|
|
|
|
|
setattr(current_user, '_get_current_object', lambda: user)
|
|
|
|
|
|
|
|
|
|
# Executar a função com o usuário carregado
|
2025-04-03 15:58:07 -03:00
|
|
|
return f(*args, **kwargs)
|
2025-04-13 22:30:05 -03:00
|
|
|
except Exception as e:
|
|
|
|
|
db.rollback()
|
|
|
|
|
flash('Erro ao carregar dados do usuário.', 'danger')
|
|
|
|
|
return redirect(url_for('login'))
|
2025-04-03 15:58:07 -03:00
|
|
|
finally:
|
|
|
|
|
db.close()
|
|
|
|
|
return decorated_function
|
|
|
|
|
|
|
|
|
|
def require_permission(permission_name):
|
|
|
|
|
"""Decorador para verificar se o usuário tem uma permissão específica"""
|
|
|
|
|
def decorator(f):
|
|
|
|
|
@wraps(f)
|
|
|
|
|
def decorated_function(*args, **kwargs):
|
|
|
|
|
if not current_user.is_authenticated:
|
2025-04-13 22:30:05 -03:00
|
|
|
flash('Você precisa estar logado para acessar esta página.', 'error')
|
2025-04-03 15:58:07 -03:00
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
2025-04-13 22:30:05 -03:00
|
|
|
db = get_db_connection()
|
|
|
|
|
try:
|
|
|
|
|
# Carregar o usuário com suas roles e permissões
|
|
|
|
|
user = db.query(Usuario).options(
|
|
|
|
|
joinedload(Usuario.roles).joinedload(Role.permissions),
|
|
|
|
|
joinedload(Usuario.militante),
|
|
|
|
|
joinedload(Usuario.cr),
|
|
|
|
|
joinedload(Usuario.setor),
|
|
|
|
|
joinedload(Usuario.celula)
|
|
|
|
|
).get(current_user.id)
|
|
|
|
|
|
|
|
|
|
if not user:
|
|
|
|
|
flash('Usuário não encontrado.', 'error')
|
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
|
|
|
|
if not user.has_permission(permission_name):
|
|
|
|
|
flash('Você não tem permissão para acessar esta página.', 'error')
|
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
|
|
|
|
# Atualiza timestamp da última atividade
|
|
|
|
|
user.update_last_activity()
|
|
|
|
|
db.commit()
|
|
|
|
|
|
|
|
|
|
# Substituir o current_user pelo usuário carregado
|
|
|
|
|
setattr(current_user, '_get_current_object', lambda: user)
|
|
|
|
|
|
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
|
finally:
|
|
|
|
|
db.close()
|
2025-04-03 15:58:07 -03:00
|
|
|
return decorated_function
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
|
def require_role(role_name):
|
|
|
|
|
"""Decorador para verificar se o usuário tem um papel específico"""
|
|
|
|
|
def decorator(f):
|
|
|
|
|
@wraps(f)
|
|
|
|
|
def decorated_function(*args, **kwargs):
|
|
|
|
|
if not current_user.is_authenticated:
|
|
|
|
|
flash('Você precisa estar logado para acessar esta página.', 'error')
|
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
|
|
|
|
db = get_db_connection()
|
|
|
|
|
try:
|
|
|
|
|
user = db.query(Usuario).get(current_user.id)
|
|
|
|
|
if not user or not user.has_role(role_name):
|
|
|
|
|
flash('Você não tem permissão para acessar esta página.', 'error')
|
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
|
|
|
|
# Atualiza timestamp da última atividade
|
|
|
|
|
user.update_last_activity()
|
|
|
|
|
db.commit()
|
|
|
|
|
|
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
|
finally:
|
|
|
|
|
db.close()
|
|
|
|
|
return decorated_function
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
|
def require_minimum_role(min_level):
|
|
|
|
|
"""Decorador para verificar se o usuário tem um papel com nível mínimo"""
|
|
|
|
|
def decorator(f):
|
|
|
|
|
@wraps(f)
|
|
|
|
|
def decorated_function(*args, **kwargs):
|
|
|
|
|
if not current_user.is_authenticated:
|
|
|
|
|
flash('Você precisa estar logado para acessar esta página.', 'error')
|
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
|
|
|
|
db = get_db_connection()
|
|
|
|
|
try:
|
|
|
|
|
user = db.query(Usuario).get(current_user.id)
|
|
|
|
|
if not user:
|
|
|
|
|
flash('Usuário não encontrado.', 'error')
|
|
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
|
|
|
|
highest_role = user.get_highest_role()
|
|
|
|
|
if not highest_role or highest_role.nivel < min_level:
|
|
|
|
|
flash('Você não tem permissão para acessar esta página.', 'error')
|
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
|
|
|
|
# Atualiza timestamp da última atividade
|
|
|
|
|
user.update_last_activity()
|
|
|
|
|
db.commit()
|
|
|
|
|
|
|
|
|
|
return f(*args, **kwargs)
|
|
|
|
|
finally:
|
|
|
|
|
db.close()
|
|
|
|
|
return decorated_function
|
|
|
|
|
return decorator
|
|
|
|
|
|
2025-04-03 17:10:43 -03:00
|
|
|
def require_instance_permission(permission_name, instance_param):
|
2025-04-03 15:58:07 -03:00
|
|
|
"""Decorator para verificar se o usuário tem permissão em uma instância específica"""
|
|
|
|
|
def decorator(f):
|
|
|
|
|
@wraps(f)
|
|
|
|
|
def decorated_function(*args, **kwargs):
|
2025-04-03 17:10:43 -03:00
|
|
|
if not current_user.is_authenticated:
|
|
|
|
|
flash('Por favor, faça login para acessar esta página.', 'error')
|
2025-04-03 15:58:07 -03:00
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
2025-04-03 17:10:43 -03:00
|
|
|
# Obtém o ID da instância dos argumentos da função
|
|
|
|
|
instance_id = kwargs.get(instance_param)
|
|
|
|
|
if instance_id is None:
|
|
|
|
|
flash('ID da instância não encontrado.', 'error')
|
|
|
|
|
return redirect(url_for('home'))
|
|
|
|
|
|
|
|
|
|
if not current_user.has_instance_permission(permission_name, instance_id):
|
|
|
|
|
flash('Você não tem permissão para acessar esta instância.', 'error')
|
|
|
|
|
return redirect(url_for('home'))
|
|
|
|
|
|
|
|
|
|
return f(*args, **kwargs)
|
2025-04-03 15:58:07 -03:00
|
|
|
return decorated_function
|
|
|
|
|
return decorator
|
|
|
|
|
|
|
|
|
|
def require_instance_access(instance_type, instance_id):
|
|
|
|
|
"""Decorator para verificar se o usuário tem acesso a uma instância específica"""
|
|
|
|
|
def decorator(f):
|
|
|
|
|
@wraps(f)
|
|
|
|
|
def decorated_function(*args, **kwargs):
|
2025-04-03 17:10:43 -03:00
|
|
|
if not current_user.is_authenticated:
|
|
|
|
|
flash('Por favor, faça login para acessar esta página.', 'error')
|
2025-04-03 15:58:07 -03:00
|
|
|
return redirect(url_for('login'))
|
|
|
|
|
|
2025-04-03 17:10:43 -03:00
|
|
|
# Verificar acesso baseado na instância do usuário
|
|
|
|
|
if instance_type == 'celula':
|
|
|
|
|
if not (current_user.celula_id == instance_id or
|
|
|
|
|
current_user.has_permission(Permission.VIEW_SECTOR_REPORTS) or
|
|
|
|
|
current_user.has_permission(Permission.VIEW_CR_REPORTS) or
|
|
|
|
|
current_user.has_permission(Permission.VIEW_CC_REPORTS)):
|
|
|
|
|
flash('Você não tem acesso a esta célula.', 'error')
|
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
|
elif instance_type == 'setor':
|
|
|
|
|
if not (current_user.setor_id == instance_id or
|
|
|
|
|
current_user.has_permission(Permission.VIEW_CR_REPORTS) or
|
|
|
|
|
current_user.has_permission(Permission.VIEW_CC_REPORTS)):
|
|
|
|
|
flash('Você não tem acesso a este setor.', 'error')
|
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
|
elif instance_type == 'cr':
|
|
|
|
|
if not (current_user.cr_id == instance_id or
|
|
|
|
|
current_user.has_permission(Permission.VIEW_CC_REPORTS)):
|
|
|
|
|
flash('Você não tem acesso a este CR.', 'error')
|
|
|
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
|
|
|
|
# Atualiza timestamp da última atividade
|
|
|
|
|
current_user.update_last_activity()
|
|
|
|
|
db_session.commit()
|
|
|
|
|
|
|
|
|
|
return f(*args, **kwargs)
|
2025-04-03 15:58:07 -03:00
|
|
|
return decorated_function
|
|
|
|
|
return decorator
|