fix: corrige uso do decorator require_instance_permission - Modifica o decorator para aceitar o nome do parâmetro da instância - Atualiza as rotas de pagamentos para usar o decorator corretamente - Adiciona verificação do ID da instância nos argumentos da função - Melhora mensagens de erro para casos de permissão negada

This commit is contained in:
LS
2025-04-03 17:10:43 -03:00
parent cbaf227e58
commit 9d17c66c46
2 changed files with 241 additions and 147 deletions

255
app.py
View File

@@ -20,6 +20,7 @@ from functions.database import (
ComiteCentral, ComiteCentral,
EmailMilitante, EmailMilitante,
init_database, init_database,
EstadoMilitante,
) )
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, joinedload from sqlalchemy.orm import sessionmaker, joinedload
@@ -131,11 +132,11 @@ def login_required(f):
def session_timeout(f): def session_timeout(f):
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if 'user_id' not in session: if current_user.is_authenticated:
return redirect(url_for('login')) if 'last_activity' not in session:
session['last_activity'] = time()
return f(*args, **kwargs)
# Verificar se existe timestamp do último acesso
if 'last_activity' in session:
last_activity = datetime.fromtimestamp(session['last_activity']) last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now() now = datetime.now()
@@ -144,11 +145,9 @@ def session_timeout(f):
if now - last_activity > timedelta(minutes=timeout_minutes): if now - last_activity > timedelta(minutes=timeout_minutes):
# Registrar o logout por timeout # Registrar o logout por timeout
try: try:
user = db_session.query(Usuario).get(session['user_id']) current_user.ultimo_logout = datetime.now()
if user: current_user.motivo_logout = "Timeout de sessão"
user.ultimo_logout = datetime.now() db_session.commit()
user.motivo_logout = "Timeout de sessão"
db_session.commit()
except Exception as e: except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}") print(f"Erro ao registrar logout por timeout: {e}")
@@ -156,18 +155,17 @@ def session_timeout(f):
flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning') flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning')
return redirect(url_for('login')) return redirect(url_for('login'))
# Atualizar timestamp de último acesso # Atualizar timestamp de último acesso
session['last_activity'] = time() session['last_activity'] = time()
# Atualizar também no banco de dados # Atualizar também no banco de dados
try: try:
user = db_session.query(Usuario).get(session['user_id']) current_user.update_last_activity()
if user:
user.update_last_activity()
db_session.commit() db_session.commit()
except Exception as e: except Exception as e:
print(f"Erro ao atualizar última atividade: {e}") print(f"Erro ao atualizar última atividade: {e}")
return f(*args, **kwargs)
return f(*args, **kwargs) return f(*args, **kwargs)
return decorated_function return decorated_function
@@ -357,8 +355,35 @@ def criar_militante():
@require_permission(Permission.VIEW_CELL_DATA) @require_permission(Permission.VIEW_CELL_DATA)
def listar_militantes(): def listar_militantes():
"""Lista todos os militantes""" """Lista todos os militantes"""
militantes = db_session.query(Militante).all() db = get_db_connection()
return render_template("listar_militantes.html", militantes=militantes) try:
# Adicionar opção de filtro por estado
estado = request.args.get('estado', 'todos')
query = db.query(Militante)
if estado != 'todos':
query = query.filter(Militante.estado == estado)
militantes = query.all()
# Precomputar todas as responsabilidades antes de renderizar
for militante in militantes:
militante.is_responsavel_financas = bool(militante.responsabilidades & Militante.RESPONSAVEL_FINANCAS)
militante.is_responsavel_imprensa = bool(militante.responsabilidades & Militante.RESPONSAVEL_IMPRENSA)
militante.is_quadro_orientador = bool(militante.responsabilidades & Militante.QUADRO_ORIENTADOR)
return render_template(
"listar_militantes.html",
militantes=militantes,
estado_atual=estado,
estados=[
('todos', 'Todos'),
(EstadoMilitante.ATIVO.value, 'Ativos'),
(EstadoMilitante.DESLIGADO.value, 'Desligados')
]
)
finally:
db.close()
# Rota para criar uma nova cota mensal # Rota para criar uma nova cota mensal
@app.route("/cotas/novo", methods=["GET", "POST"]) @app.route("/cotas/novo", methods=["GET", "POST"])
@@ -398,8 +423,7 @@ def listar_cotas():
@app.route("/pagamentos/novo", methods=["GET", "POST"]) @app.route("/pagamentos/novo", methods=["GET", "POST"])
@require_login @require_login
def novo_pagamento(): def novo_pagamento():
# Verificar permissões do usuário user = current_user
user = db_session.query(Usuario).get(session['user_id'])
# Verificar se o usuário tem permissão para registrar pagamentos em alguma instância # Verificar se o usuário tem permissão para registrar pagamentos em alguma instância
if not (user.has_permission(Permission.REGISTER_CELL_PAYMENT) or if not (user.has_permission(Permission.REGISTER_CELL_PAYMENT) or
@@ -433,28 +457,17 @@ def novo_pagamento():
@app.route("/pagamentos") @app.route("/pagamentos")
@require_login @require_login
def listar_pagamentos(): def listar_pagamentos():
"""Lista todos os pagamentos""" user = current_user
user = db_session.query(Usuario).get(session['user_id'])
# Filtrar pagamentos baseado na instância do usuário # Filtrar pagamentos baseado na instância do usuário
if user.has_permission(Permission.REGISTER_CC_PAYMENT): if user.has_permission(Permission.REGISTER_CC_PAYMENT):
# Usuário do CC pode ver todos os pagamentos
pagamentos = db_session.query(Pagamento).all() pagamentos = db_session.query(Pagamento).all()
elif user.has_permission(Permission.REGISTER_CR_PAYMENT): elif user.has_permission(Permission.REGISTER_CR_PAYMENT):
# Usuário do CR pode ver pagamentos do CR e setores/células abaixo pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_cr_instances())).all()
pagamentos = db_session.query(Pagamento).filter(
Pagamento.cr_id == user.cr_id
).all()
elif user.has_permission(Permission.REGISTER_SECTOR_PAYMENT): elif user.has_permission(Permission.REGISTER_SECTOR_PAYMENT):
# Usuário do setor pode ver pagamentos do setor e células abaixo pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_sector_instances())).all()
pagamentos = db_session.query(Pagamento).filter(
Pagamento.setor_id == user.setor_id
).all()
elif user.has_permission(Permission.REGISTER_CELL_PAYMENT): elif user.has_permission(Permission.REGISTER_CELL_PAYMENT):
# Usuário da célula pode ver apenas pagamentos da célula pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_cell_instances())).all()
pagamentos = db_session.query(Pagamento).filter(
Pagamento.celula_id == user.celula_id
).all()
else: else:
flash('Você não tem permissão para visualizar pagamentos.', 'error') flash('Você não tem permissão para visualizar pagamentos.', 'error')
return redirect(url_for('home')) return redirect(url_for('home'))
@@ -668,27 +681,36 @@ def editar_militante(id):
@app.route('/dash') @app.route('/dash')
@require_login @require_login
def dashboard_admin(): def dashboard_admin():
# Filtrar usuários baseado na hierarquia """
if current_user.has_permission('system_config'): Rota para o dashboard administrativo
# Secretário Geral e Secretário de Organização podem ver tudo """
users = db_session.query(Usuario).all() db = get_db_connection()
elif current_user.has_permission('manage_cc_crs'): try:
# Membro do CC pode ver tudo # Carregar usuários com seus relacionamentos
users = db_session.query(Usuario).all() users = db.query(Usuario).options(
elif current_user.has_permission('manage_cr_sectors'): joinedload(Usuario.roles),
# Secretário de CR pode ver apenas membros do seu CR joinedload(Usuario.militante)
users = db_session.query(Usuario).join(Usuario.roles).filter(Role.nome == 'membro_setor').all() ).all()
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor pode ver apenas membros do seu setor
users = db_session.query(Usuario).join(Usuario.roles).filter(Role.nome == 'membro_celula').all()
elif current_user.has_permission('manage_cell_members'):
# Secretário de Célula pode ver apenas membros da sua célula
users = db_session.query(Usuario).join(Usuario.roles).filter(Role.nome == 'militante_basico').all()
else:
# Militante básico pode ver apenas a si mesmo
users = [current_user]
return render_template('dashboard.html', users=users) # Filtrar usuários baseado nas permissões
if not current_user.has_permission('system_config'):
if current_user.has_permission('manage_cr_sectors'):
users = [u for u in users if u.cr_id == current_user.cr_id]
elif current_user.has_permission('manage_sector_cells'):
users = [u for u in users if u.setor_id == current_user.setor_id]
elif current_user.has_permission('manage_cell_members'):
users = [u for u in users if u.celula_id == current_user.celula_id]
else:
users = []
# Carregar dados necessários antes de fechar a sessão
for user in users:
if user.militante:
user.militante.get_responsabilidades_list()
return render_template('dashboard.html', users=users)
finally:
db.close()
@app.route("/usuarios/<int:user_id>/otp/reset", methods=["POST"]) @app.route("/usuarios/<int:user_id>/otp/reset", methods=["POST"])
@require_login @require_login
@@ -754,19 +776,18 @@ def reset_password(user_id):
@app.route('/check_session') @app.route('/check_session')
def check_session(): def check_session():
if 'user_id' not in session: if not current_user.is_authenticated:
return jsonify({'status': 'expired'}) return jsonify({'status': 'expired'})
db = get_db_connection() db = get_db_connection()
try: try:
user = db.get(Usuario, session['user_id']) if current_user.is_session_expired():
if not user or user.is_session_expired(): current_user.logout()
user.logout()
db.commit() db.commit()
session.pop('user_id', None) logout_user()
return jsonify({'status': 'expired'}) return jsonify({'status': 'expired'})
user.update_last_activity() current_user.update_last_activity()
db.commit() db.commit()
return jsonify({'status': 'active'}) return jsonify({'status': 'active'})
finally: finally:
@@ -875,7 +896,7 @@ def list_pagamentos_cr(cr_id):
@app.route('/celulas/<int:celula_id>/pagamentos/novo', methods=['GET', 'POST']) @app.route('/celulas/<int:celula_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login @require_login
@require_instance_permission('REGISTER_CELL_PAYMENT') @require_instance_permission('REGISTER_CELL_PAYMENT', 'celula_id')
def novo_pagamento_celula(celula_id): def novo_pagamento_celula(celula_id):
if request.method == 'POST': if request.method == 'POST':
db = get_db_connection() db = get_db_connection()
@@ -896,7 +917,7 @@ def novo_pagamento_celula(celula_id):
@app.route('/setores/<int:setor_id>/pagamentos/novo', methods=['GET', 'POST']) @app.route('/setores/<int:setor_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login @require_login
@require_instance_permission('REGISTER_SECTOR_PAYMENT') @require_instance_permission('REGISTER_SECTOR_PAYMENT', 'setor_id')
def novo_pagamento_setor(setor_id): def novo_pagamento_setor(setor_id):
if request.method == 'POST': if request.method == 'POST':
db = get_db_connection() db = get_db_connection()
@@ -917,7 +938,7 @@ def novo_pagamento_setor(setor_id):
@app.route('/crs/<int:cr_id>/pagamentos/novo', methods=['GET', 'POST']) @app.route('/crs/<int:cr_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login @require_login
@require_instance_permission('REGISTER_CR_PAYMENT') @require_instance_permission('REGISTER_CR_PAYMENT', 'cr_id')
def novo_pagamento_cr(cr_id): def novo_pagamento_cr(cr_id):
if request.method == 'POST': if request.method == 'POST':
db = get_db_connection() db = get_db_connection()
@@ -1184,6 +1205,11 @@ def avaliar_aspirante(user_id):
finally: finally:
db.close() db.close()
@app.template_filter('has_permission')
def has_permission(value, permission):
"""Verifica se o valor contém a permissão especificada."""
return bool(value & permission)
def create_app(): def create_app():
app = Flask(__name__) app = Flask(__name__)
# ... existing code ... # ... existing code ...
@@ -1195,6 +1221,10 @@ def init_system():
"""Inicializa o sistema com todos os usuários necessários""" """Inicializa o sistema com todos os usuários necessários"""
print("Inicializando sistema...") print("Inicializando sistema...")
# Inicializar banco de dados
print("Inicializando banco de dados...")
init_database()
# Criar admin # Criar admin
create_admin() create_admin()
@@ -1243,6 +1273,99 @@ def init_system():
print("Username: aligner, tester, deployer") print("Username: aligner, tester, deployer")
print("Senha: Test123!@#") print("Senha: Test123!@#")
@app.route('/militante/desligar/<int:id>', methods=['POST'])
@login_required
def desligar_militante(id):
"""Desliga um militante e desativa seu usuário associado"""
militante = db_session.query(Militante).get(id)
if militante is None:
flash('Militante não encontrado.', 'danger')
return redirect(url_for('listar_militantes'))
motivo = request.form.get('motivo')
if not motivo:
flash('É necessário informar o motivo do desligamento.', 'danger')
return redirect(url_for('listar_militantes'))
# Atualizar estado do militante
militante.estado = EstadoMilitante.DESLIGADO.value
militante.data_desligamento = datetime.utcnow()
militante.motivo_desligamento = motivo
# Desativar usuário associado se existir
if militante.usuario:
militante.usuario.ativo = False
militante.usuario.ultimo_logout = datetime.utcnow()
militante.usuario.motivo_logout = "Desligamento do militante"
db_session.commit()
flash('Militante desligado com sucesso.', 'success')
return redirect(url_for('listar_militantes'))
@app.route('/editar_pagamento/<int:id>', methods=['GET', 'POST'])
@require_login
def editar_pagamento(id):
user = current_user
# Verificar permissões do usuário
if not (user.has_permission(Permission.EDIT_CELL_PAYMENT) or
user.has_permission(Permission.EDIT_SECTOR_PAYMENT) or
user.has_permission(Permission.EDIT_CR_PAYMENT) or
user.has_permission(Permission.EDIT_CC_PAYMENT)):
flash('Você não tem permissão para editar pagamentos.', 'error')
return redirect(url_for('home'))
pagamento = db_session.query(Pagamento).get(id)
if not pagamento:
flash('Pagamento não encontrado.', 'error')
return redirect(url_for('listar_pagamentos'))
if request.method == 'POST':
pagamento.valor = request.form['valor']
pagamento.data_pagamento = datetime.strptime(request.form['data_pagamento'], '%Y-%m-%d')
db_session.commit()
flash('Pagamento atualizado com sucesso!', 'success')
return redirect(url_for('listar_pagamentos'))
return render_template('editar_pagamento.html', pagamento=pagamento)
@app.before_request
def session_timeout():
"""Verifica se a sessão expirou"""
if current_user.is_authenticated:
if 'last_activity' not in session:
session['last_activity'] = time()
return
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
# Se passaram mais de 30 minutos (configurável)
timeout_minutes = 30
if now - last_activity > timedelta(minutes=timeout_minutes):
# Registrar o logout por timeout
try:
current_user.ultimo_logout = datetime.now()
current_user.motivo_logout = "Timeout de sessão"
db_session.commit()
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning')
return redirect(url_for('login'))
# Atualizar timestamp de último acesso
session['last_activity'] = time()
# Atualizar também no banco de dados
try:
current_user.update_last_activity()
db_session.commit()
except Exception as e:
print(f"Erro ao atualizar última atividade: {e}")
if __name__ == '__main__': if __name__ == '__main__':
init_system() init_system()
app.run(debug=True) app.run(debug=True)

View File

@@ -10,7 +10,7 @@ def require_login(f):
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if not current_user.is_authenticated: if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error') flash('Por favor, faça login para acessar esta página.', 'error')
return redirect(url_for('login')) return redirect(url_for('login'))
db = get_db_connection() db = get_db_connection()
@@ -39,23 +39,14 @@ def require_permission(permission_name):
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if not current_user.is_authenticated: if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error') flash('Por favor, faça login para acessar esta página.', 'error')
return redirect(url_for('login')) return redirect(url_for('login'))
db = get_db_connection() if not current_user.has_permission(permission_name):
try: flash('Você não tem permissão para acessar esta página.', 'error')
user = db.query(Usuario).get(current_user.id) return redirect(url_for('home'))
if not user or not user.has_permission(permission_name):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade return f(*args, **kwargs)
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function return decorated_function
return decorator return decorator
@@ -116,37 +107,26 @@ def require_minimum_role(min_level):
return decorated_function return decorated_function
return decorator return decorator
def require_instance_permission(permission_name): def require_instance_permission(permission_name, instance_param):
"""Decorator para verificar se o usuário tem permissão em uma instância específica""" """Decorator para verificar se o usuário tem permissão em uma instância específica"""
def decorator(f): def decorator(f):
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if 'user_id' not in session: if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error') flash('Por favor, faça login para acessar esta página.', 'error')
return redirect(url_for('login')) return redirect(url_for('login'))
db = get_db_connection() # Obtém o ID da instância dos argumentos da função
try: instance_id = kwargs.get(instance_param)
user = db.query(Usuario).get(session['user_id']) if instance_id is None:
if not user: flash('ID da instância não encontrado.', 'error')
flash('Usuário não encontrado.', 'error') return redirect(url_for('home'))
return redirect(url_for('login'))
# Verificar se o usuário tem a permissão em alguma instância if not current_user.has_instance_permission(permission_name, instance_id):
if not (user.has_permission(permission_name) or flash('Você não tem permissão para acessar esta instância.', 'error')
user.has_permission(f"{permission_name}_sector") or return redirect(url_for('home'))
user.has_permission(f"{permission_name}_cr") or
user.has_permission(f"{permission_name}_cc")):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade return f(*args, **kwargs)
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function return decorated_function
return decorator return decorator
@@ -155,43 +135,34 @@ def require_instance_access(instance_type, instance_id):
def decorator(f): def decorator(f):
@wraps(f) @wraps(f)
def decorated_function(*args, **kwargs): def decorated_function(*args, **kwargs):
if 'user_id' not in session: if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error') flash('Por favor, faça login para acessar esta página.', 'error')
return redirect(url_for('login')) return redirect(url_for('login'))
db = get_db_connection() # Verificar acesso baseado na instância do usuário
try: if instance_type == 'celula':
user = db.query(Usuario).get(session['user_id']) if not (current_user.celula_id == instance_id or
if not user: current_user.has_permission(Permission.VIEW_SECTOR_REPORTS) or
flash('Usuário não encontrado.', 'error') current_user.has_permission(Permission.VIEW_CR_REPORTS) or
return redirect(url_for('login')) current_user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a esta célula.', 'error')
return redirect(url_for('index'))
elif instance_type == 'setor':
if not (current_user.setor_id == instance_id or
current_user.has_permission(Permission.VIEW_CR_REPORTS) or
current_user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este setor.', 'error')
return redirect(url_for('index'))
elif instance_type == 'cr':
if not (current_user.cr_id == instance_id or
current_user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este CR.', 'error')
return redirect(url_for('index'))
# Verificar acesso baseado na instância do usuário # Atualiza timestamp da última atividade
if instance_type == 'celula': current_user.update_last_activity()
if not (user.celula_id == instance_id or db_session.commit()
user.has_permission(Permission.VIEW_SECTOR_REPORTS) or
user.has_permission(Permission.VIEW_CR_REPORTS) or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a esta célula.', 'error')
return redirect(url_for('index'))
elif instance_type == 'setor':
if not (user.setor_id == instance_id or
user.has_permission(Permission.VIEW_CR_REPORTS) or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este setor.', 'error')
return redirect(url_for('index'))
elif instance_type == 'cr':
if not (user.cr_id == instance_id or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este CR.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade return f(*args, **kwargs)
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function return decorated_function
return decorator return decorator