From 9d17c66c4619cd86c2b3ae5c11cf450083c4fcb9 Mon Sep 17 00:00:00 2001 From: LS Date: Thu, 3 Apr 2025 17:10:43 -0300 Subject: [PATCH] =?UTF-8?q?fix:=20corrige=20uso=20do=20decorator=20require?= =?UTF-8?q?=5Finstance=5Fpermission=20-=20Modifica=20o=20decorator=20para?= =?UTF-8?q?=20aceitar=20o=20nome=20do=20par=C3=A2metro=20da=20inst=C3=A2nc?= =?UTF-8?q?ia=20-=20Atualiza=20as=20rotas=20de=20pagamentos=20para=20usar?= =?UTF-8?q?=20o=20decorator=20corretamente=20-=20Adiciona=20verifica=C3=A7?= =?UTF-8?q?=C3=A3o=20do=20ID=20da=20inst=C3=A2ncia=20nos=20argumentos=20da?= =?UTF-8?q?=20fun=C3=A7=C3=A3o=20-=20Melhora=20mensagens=20de=20erro=20par?= =?UTF-8?q?a=20casos=20de=20permiss=C3=A3o=20negada?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app.py | 263 +++++++++++++++++++++++++++++----------- functions/decorators.py | 125 ++++++++----------- 2 files changed, 241 insertions(+), 147 deletions(-) diff --git a/app.py b/app.py index f6d5632..1b0aedd 100644 --- a/app.py +++ b/app.py @@ -20,6 +20,7 @@ from functions.database import ( ComiteCentral, EmailMilitante, init_database, + EstadoMilitante, ) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, joinedload @@ -131,11 +132,11 @@ def login_required(f): def session_timeout(f): @wraps(f) def decorated_function(*args, **kwargs): - if 'user_id' not in session: - return redirect(url_for('login')) + if current_user.is_authenticated: + if 'last_activity' not in session: + session['last_activity'] = time() + return f(*args, **kwargs) - # Verificar se existe timestamp do último acesso - if 'last_activity' in session: last_activity = datetime.fromtimestamp(session['last_activity']) now = datetime.now() @@ -144,30 +145,27 @@ def session_timeout(f): if now - last_activity > timedelta(minutes=timeout_minutes): # Registrar o logout por timeout try: - user = db_session.query(Usuario).get(session['user_id']) - if user: - user.ultimo_logout = datetime.now() - user.motivo_logout = "Timeout de sessão" - db_session.commit() + current_user.ultimo_logout = datetime.now() + current_user.motivo_logout = "Timeout de sessão" + db_session.commit() except Exception as e: print(f"Erro ao registrar logout por timeout: {e}") session.clear() flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning') return redirect(url_for('login')) - - # Atualizar timestamp de último acesso - session['last_activity'] = time() - - # Atualizar também no banco de dados - try: - user = db_session.query(Usuario).get(session['user_id']) - if user: - user.update_last_activity() - db_session.commit() - except Exception as e: - print(f"Erro ao atualizar última atividade: {e}") + # Atualizar timestamp de último acesso + session['last_activity'] = time() + + # Atualizar também no banco de dados + try: + current_user.update_last_activity() + db_session.commit() + except Exception as e: + print(f"Erro ao atualizar última atividade: {e}") + + return f(*args, **kwargs) return f(*args, **kwargs) return decorated_function @@ -357,8 +355,35 @@ def criar_militante(): @require_permission(Permission.VIEW_CELL_DATA) def listar_militantes(): """Lista todos os militantes""" - militantes = db_session.query(Militante).all() - return render_template("listar_militantes.html", militantes=militantes) + db = get_db_connection() + try: + # Adicionar opção de filtro por estado + estado = request.args.get('estado', 'todos') + + query = db.query(Militante) + if estado != 'todos': + query = query.filter(Militante.estado == estado) + + militantes = query.all() + + # Precomputar todas as responsabilidades antes de renderizar + for militante in militantes: + militante.is_responsavel_financas = bool(militante.responsabilidades & Militante.RESPONSAVEL_FINANCAS) + militante.is_responsavel_imprensa = bool(militante.responsabilidades & Militante.RESPONSAVEL_IMPRENSA) + militante.is_quadro_orientador = bool(militante.responsabilidades & Militante.QUADRO_ORIENTADOR) + + return render_template( + "listar_militantes.html", + militantes=militantes, + estado_atual=estado, + estados=[ + ('todos', 'Todos'), + (EstadoMilitante.ATIVO.value, 'Ativos'), + (EstadoMilitante.DESLIGADO.value, 'Desligados') + ] + ) + finally: + db.close() # Rota para criar uma nova cota mensal @app.route("/cotas/novo", methods=["GET", "POST"]) @@ -398,8 +423,7 @@ def listar_cotas(): @app.route("/pagamentos/novo", methods=["GET", "POST"]) @require_login def novo_pagamento(): - # Verificar permissões do usuário - user = db_session.query(Usuario).get(session['user_id']) + user = current_user # Verificar se o usuário tem permissão para registrar pagamentos em alguma instância if not (user.has_permission(Permission.REGISTER_CELL_PAYMENT) or @@ -433,28 +457,17 @@ def novo_pagamento(): @app.route("/pagamentos") @require_login def listar_pagamentos(): - """Lista todos os pagamentos""" - user = db_session.query(Usuario).get(session['user_id']) + user = current_user # Filtrar pagamentos baseado na instância do usuário if user.has_permission(Permission.REGISTER_CC_PAYMENT): - # Usuário do CC pode ver todos os pagamentos pagamentos = db_session.query(Pagamento).all() elif user.has_permission(Permission.REGISTER_CR_PAYMENT): - # Usuário do CR pode ver pagamentos do CR e setores/células abaixo - pagamentos = db_session.query(Pagamento).filter( - Pagamento.cr_id == user.cr_id - ).all() + pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_cr_instances())).all() elif user.has_permission(Permission.REGISTER_SECTOR_PAYMENT): - # Usuário do setor pode ver pagamentos do setor e células abaixo - pagamentos = db_session.query(Pagamento).filter( - Pagamento.setor_id == user.setor_id - ).all() + pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_sector_instances())).all() elif user.has_permission(Permission.REGISTER_CELL_PAYMENT): - # Usuário da célula pode ver apenas pagamentos da célula - pagamentos = db_session.query(Pagamento).filter( - Pagamento.celula_id == user.celula_id - ).all() + pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_cell_instances())).all() else: flash('Você não tem permissão para visualizar pagamentos.', 'error') return redirect(url_for('home')) @@ -668,27 +681,36 @@ def editar_militante(id): @app.route('/dash') @require_login def dashboard_admin(): - # Filtrar usuários baseado na hierarquia - if current_user.has_permission('system_config'): - # Secretário Geral e Secretário de Organização podem ver tudo - users = db_session.query(Usuario).all() - elif current_user.has_permission('manage_cc_crs'): - # Membro do CC pode ver tudo - users = db_session.query(Usuario).all() - elif current_user.has_permission('manage_cr_sectors'): - # Secretário de CR pode ver apenas membros do seu CR - users = db_session.query(Usuario).join(Usuario.roles).filter(Role.nome == 'membro_setor').all() - elif current_user.has_permission('manage_sector_cells'): - # Secretário de Setor pode ver apenas membros do seu setor - users = db_session.query(Usuario).join(Usuario.roles).filter(Role.nome == 'membro_celula').all() - elif current_user.has_permission('manage_cell_members'): - # Secretário de Célula pode ver apenas membros da sua célula - users = db_session.query(Usuario).join(Usuario.roles).filter(Role.nome == 'militante_basico').all() - else: - # Militante básico pode ver apenas a si mesmo - users = [current_user] - - return render_template('dashboard.html', users=users) + """ + Rota para o dashboard administrativo + """ + db = get_db_connection() + try: + # Carregar usuários com seus relacionamentos + users = db.query(Usuario).options( + joinedload(Usuario.roles), + joinedload(Usuario.militante) + ).all() + + # Filtrar usuários baseado nas permissões + if not current_user.has_permission('system_config'): + if current_user.has_permission('manage_cr_sectors'): + users = [u for u in users if u.cr_id == current_user.cr_id] + elif current_user.has_permission('manage_sector_cells'): + users = [u for u in users if u.setor_id == current_user.setor_id] + elif current_user.has_permission('manage_cell_members'): + users = [u for u in users if u.celula_id == current_user.celula_id] + else: + users = [] + + # Carregar dados necessários antes de fechar a sessão + for user in users: + if user.militante: + user.militante.get_responsabilidades_list() + + return render_template('dashboard.html', users=users) + finally: + db.close() @app.route("/usuarios//otp/reset", methods=["POST"]) @require_login @@ -754,19 +776,18 @@ def reset_password(user_id): @app.route('/check_session') def check_session(): - if 'user_id' not in session: + if not current_user.is_authenticated: return jsonify({'status': 'expired'}) db = get_db_connection() try: - user = db.get(Usuario, session['user_id']) - if not user or user.is_session_expired(): - user.logout() + if current_user.is_session_expired(): + current_user.logout() db.commit() - session.pop('user_id', None) + logout_user() return jsonify({'status': 'expired'}) - user.update_last_activity() + current_user.update_last_activity() db.commit() return jsonify({'status': 'active'}) finally: @@ -875,7 +896,7 @@ def list_pagamentos_cr(cr_id): @app.route('/celulas//pagamentos/novo', methods=['GET', 'POST']) @require_login -@require_instance_permission('REGISTER_CELL_PAYMENT') +@require_instance_permission('REGISTER_CELL_PAYMENT', 'celula_id') def novo_pagamento_celula(celula_id): if request.method == 'POST': db = get_db_connection() @@ -896,7 +917,7 @@ def novo_pagamento_celula(celula_id): @app.route('/setores//pagamentos/novo', methods=['GET', 'POST']) @require_login -@require_instance_permission('REGISTER_SECTOR_PAYMENT') +@require_instance_permission('REGISTER_SECTOR_PAYMENT', 'setor_id') def novo_pagamento_setor(setor_id): if request.method == 'POST': db = get_db_connection() @@ -917,7 +938,7 @@ def novo_pagamento_setor(setor_id): @app.route('/crs//pagamentos/novo', methods=['GET', 'POST']) @require_login -@require_instance_permission('REGISTER_CR_PAYMENT') +@require_instance_permission('REGISTER_CR_PAYMENT', 'cr_id') def novo_pagamento_cr(cr_id): if request.method == 'POST': db = get_db_connection() @@ -1184,6 +1205,11 @@ def avaliar_aspirante(user_id): finally: db.close() +@app.template_filter('has_permission') +def has_permission(value, permission): + """Verifica se o valor contém a permissão especificada.""" + return bool(value & permission) + def create_app(): app = Flask(__name__) # ... existing code ... @@ -1195,6 +1221,10 @@ def init_system(): """Inicializa o sistema com todos os usuários necessários""" print("Inicializando sistema...") + # Inicializar banco de dados + print("Inicializando banco de dados...") + init_database() + # Criar admin create_admin() @@ -1243,6 +1273,99 @@ def init_system(): print("Username: aligner, tester, deployer") print("Senha: Test123!@#") +@app.route('/militante/desligar/', methods=['POST']) +@login_required +def desligar_militante(id): + """Desliga um militante e desativa seu usuário associado""" + militante = db_session.query(Militante).get(id) + if militante is None: + flash('Militante não encontrado.', 'danger') + return redirect(url_for('listar_militantes')) + + motivo = request.form.get('motivo') + if not motivo: + flash('É necessário informar o motivo do desligamento.', 'danger') + return redirect(url_for('listar_militantes')) + + # Atualizar estado do militante + militante.estado = EstadoMilitante.DESLIGADO.value + militante.data_desligamento = datetime.utcnow() + militante.motivo_desligamento = motivo + + # Desativar usuário associado se existir + if militante.usuario: + militante.usuario.ativo = False + militante.usuario.ultimo_logout = datetime.utcnow() + militante.usuario.motivo_logout = "Desligamento do militante" + + db_session.commit() + flash('Militante desligado com sucesso.', 'success') + return redirect(url_for('listar_militantes')) + +@app.route('/editar_pagamento/', methods=['GET', 'POST']) +@require_login +def editar_pagamento(id): + user = current_user + + # Verificar permissões do usuário + if not (user.has_permission(Permission.EDIT_CELL_PAYMENT) or + user.has_permission(Permission.EDIT_SECTOR_PAYMENT) or + user.has_permission(Permission.EDIT_CR_PAYMENT) or + user.has_permission(Permission.EDIT_CC_PAYMENT)): + flash('Você não tem permissão para editar pagamentos.', 'error') + return redirect(url_for('home')) + + pagamento = db_session.query(Pagamento).get(id) + if not pagamento: + flash('Pagamento não encontrado.', 'error') + return redirect(url_for('listar_pagamentos')) + + if request.method == 'POST': + pagamento.valor = request.form['valor'] + pagamento.data_pagamento = datetime.strptime(request.form['data_pagamento'], '%Y-%m-%d') + + db_session.commit() + flash('Pagamento atualizado com sucesso!', 'success') + return redirect(url_for('listar_pagamentos')) + + return render_template('editar_pagamento.html', pagamento=pagamento) + +@app.before_request +def session_timeout(): + """Verifica se a sessão expirou""" + if current_user.is_authenticated: + if 'last_activity' not in session: + session['last_activity'] = time() + return + + last_activity = datetime.fromtimestamp(session['last_activity']) + now = datetime.now() + + # Se passaram mais de 30 minutos (configurável) + timeout_minutes = 30 + if now - last_activity > timedelta(minutes=timeout_minutes): + # Registrar o logout por timeout + try: + current_user.ultimo_logout = datetime.now() + current_user.motivo_logout = "Timeout de sessão" + db_session.commit() + except Exception as e: + print(f"Erro ao registrar logout por timeout: {e}") + + session.clear() + flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning') + return redirect(url_for('login')) + + # Atualizar timestamp de último acesso + session['last_activity'] = time() + + # Atualizar também no banco de dados + try: + current_user.update_last_activity() + db_session.commit() + except Exception as e: + print(f"Erro ao atualizar última atividade: {e}") + if __name__ == '__main__': init_system() app.run(debug=True) diff --git a/functions/decorators.py b/functions/decorators.py index 69840c1..c7bb0cb 100644 --- a/functions/decorators.py +++ b/functions/decorators.py @@ -10,7 +10,7 @@ def require_login(f): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: - flash('Você precisa estar logado para acessar esta página.', 'error') + flash('Por favor, faça login para acessar esta página.', 'error') return redirect(url_for('login')) db = get_db_connection() @@ -39,23 +39,14 @@ def require_permission(permission_name): @wraps(f) def decorated_function(*args, **kwargs): if not current_user.is_authenticated: - flash('Você precisa estar logado para acessar esta página.', 'error') + flash('Por favor, faça login para acessar esta página.', 'error') return redirect(url_for('login')) - db = get_db_connection() - try: - user = db.query(Usuario).get(current_user.id) - if not user or not user.has_permission(permission_name): - flash('Você não tem permissão para acessar esta página.', 'error') - return redirect(url_for('index')) - - # Atualiza timestamp da última atividade - user.update_last_activity() - db.commit() - - return f(*args, **kwargs) - finally: - db.close() + if not current_user.has_permission(permission_name): + flash('Você não tem permissão para acessar esta página.', 'error') + return redirect(url_for('home')) + + return f(*args, **kwargs) return decorated_function return decorator @@ -116,37 +107,26 @@ def require_minimum_role(min_level): return decorated_function return decorator -def require_instance_permission(permission_name): +def require_instance_permission(permission_name, instance_param): """Decorator para verificar se o usuário tem permissão em uma instância específica""" def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): - if 'user_id' not in session: - flash('Você precisa estar logado para acessar esta página.', 'error') + if not current_user.is_authenticated: + flash('Por favor, faça login para acessar esta página.', 'error') return redirect(url_for('login')) - db = get_db_connection() - try: - user = db.query(Usuario).get(session['user_id']) - if not user: - flash('Usuário não encontrado.', 'error') - return redirect(url_for('login')) - - # Verificar se o usuário tem a permissão em alguma instância - if not (user.has_permission(permission_name) or - user.has_permission(f"{permission_name}_sector") or - user.has_permission(f"{permission_name}_cr") or - user.has_permission(f"{permission_name}_cc")): - flash('Você não tem permissão para acessar esta página.', 'error') - return redirect(url_for('index')) - - # Atualiza timestamp da última atividade - user.update_last_activity() - db.commit() - - return f(*args, **kwargs) - finally: - db.close() + # Obtém o ID da instância dos argumentos da função + instance_id = kwargs.get(instance_param) + if instance_id is None: + flash('ID da instância não encontrado.', 'error') + return redirect(url_for('home')) + + if not current_user.has_instance_permission(permission_name, instance_id): + flash('Você não tem permissão para acessar esta instância.', 'error') + return redirect(url_for('home')) + + return f(*args, **kwargs) return decorated_function return decorator @@ -155,43 +135,34 @@ def require_instance_access(instance_type, instance_id): def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): - if 'user_id' not in session: - flash('Você precisa estar logado para acessar esta página.', 'error') + if not current_user.is_authenticated: + flash('Por favor, faça login para acessar esta página.', 'error') return redirect(url_for('login')) - db = get_db_connection() - try: - user = db.query(Usuario).get(session['user_id']) - if not user: - flash('Usuário não encontrado.', 'error') - return redirect(url_for('login')) - - # Verificar acesso baseado na instância do usuário - if instance_type == 'celula': - if not (user.celula_id == instance_id or - user.has_permission(Permission.VIEW_SECTOR_REPORTS) or - user.has_permission(Permission.VIEW_CR_REPORTS) or - user.has_permission(Permission.VIEW_CC_REPORTS)): - flash('Você não tem acesso a esta célula.', 'error') - return redirect(url_for('index')) - elif instance_type == 'setor': - if not (user.setor_id == instance_id or - user.has_permission(Permission.VIEW_CR_REPORTS) or - user.has_permission(Permission.VIEW_CC_REPORTS)): - flash('Você não tem acesso a este setor.', 'error') - return redirect(url_for('index')) - elif instance_type == 'cr': - if not (user.cr_id == instance_id or - user.has_permission(Permission.VIEW_CC_REPORTS)): - flash('Você não tem acesso a este CR.', 'error') - return redirect(url_for('index')) - - # Atualiza timestamp da última atividade - user.update_last_activity() - db.commit() - - return f(*args, **kwargs) - finally: - db.close() + # Verificar acesso baseado na instância do usuário + if instance_type == 'celula': + if not (current_user.celula_id == instance_id or + current_user.has_permission(Permission.VIEW_SECTOR_REPORTS) or + current_user.has_permission(Permission.VIEW_CR_REPORTS) or + current_user.has_permission(Permission.VIEW_CC_REPORTS)): + flash('Você não tem acesso a esta célula.', 'error') + return redirect(url_for('index')) + elif instance_type == 'setor': + if not (current_user.setor_id == instance_id or + current_user.has_permission(Permission.VIEW_CR_REPORTS) or + current_user.has_permission(Permission.VIEW_CC_REPORTS)): + flash('Você não tem acesso a este setor.', 'error') + return redirect(url_for('index')) + elif instance_type == 'cr': + if not (current_user.cr_id == instance_id or + current_user.has_permission(Permission.VIEW_CC_REPORTS)): + flash('Você não tem acesso a este CR.', 'error') + return redirect(url_for('index')) + + # Atualiza timestamp da última atividade + current_user.update_last_activity() + db_session.commit() + + return f(*args, **kwargs) return decorated_function return decorator \ No newline at end of file