from flask import Flask, request, render_template, redirect, url_for, flash, session, jsonify, send_file from functions.database import ( Base, Militante, CotaMensal, TipoPagamento, Pagamento, MaterialVendido, TipoMaterial, VendaJornalAvulso, engine, AssinaturaAnual, RelatorioCotasMensais, RelatorioVendasMateriais, Usuario, get_db_connection, ComiteRegional, Setor, Celula, ComiteCentral, EmailMilitante, init_database, EstadoMilitante, ) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, joinedload from datetime import datetime, timedelta from flask_bootstrap import Bootstrap5 from routes.cota import cota_bp from functions.validations import validar_cpf from functools import wraps from pathlib import Path from time import time from flask_mail import Mail, Message import os from dotenv import load_dotenv from functions.rbac import Role, Permission, init_rbac from functions.decorators import require_permission, require_role, require_minimum_role, require_login, require_instance_permission, require_instance_access import re import secrets import pyotp import qrcode import base64 from io import BytesIO from create_admin import create_admin from create_test_users import create_test_users from werkzeug.security import generate_password_hash, check_password_hash from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user import random import string from sqlalchemy.sql import func load_dotenv() app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY', secrets.token_hex(16)) bootstrap = Bootstrap5(app) # Configurar Flask-Login login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' @login_manager.user_loader def load_user(user_id): """Carrega o usuário pelo ID""" db = get_db_connection() try: # Carregar o usuário com suas roles user = db.query(Usuario).options( joinedload(Usuario.roles) ).get(user_id) return user finally: db.close() def generate_qr_code(user): """Gera um QR code para o usuário""" if not user.otp_secret: user.otp_secret = pyotp.random_base32() totp = pyotp.TOTP(user.otp_secret) qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles")) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = BytesIO() img.save(buffer, format="PNG") qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8') return qr_code # Registrar blueprints app.register_blueprint(cota_bp) # Configuração da sessão do SQLAlchemy db_session = get_db_connection() # Configurar Flask-Mail app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER', 'smtp.gmail.com') app.config['MAIL_PORT'] = int(os.getenv('MAIL_PORT', 587)) app.config['MAIL_USE_TLS'] = os.getenv('MAIL_USE_TLS', 'True').lower() == 'true' app.config['MAIL_USERNAME'] = os.getenv('MAIL_USERNAME') app.config['MAIL_PASSWORD'] = os.getenv('MAIL_PASSWORD') app.config['MAIL_DEFAULT_SENDER'] = os.getenv('MAIL_DEFAULT_SENDER') mail = Mail(app) # Inicializar banco de dados e RBAC print("Inicializando banco de dados...") init_database() print("Inicializando sistema RBAC...") init_rbac() # Criar admin e usuários de teste print("Criando usuários iniciais...") create_admin() create_test_users() # Decorator para verificar se o usuário está logado def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: flash('Por favor, faça login para acessar esta página.', 'warning') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # Decorator para verificar se a sessão expirou def session_timeout(f): @wraps(f) def decorated_function(*args, **kwargs): if current_user.is_authenticated: if 'last_activity' not in session: session['last_activity'] = time() return f(*args, **kwargs) last_activity = datetime.fromtimestamp(session['last_activity']) now = datetime.now() # Se passaram mais de 30 minutos (configurável) timeout_minutes = 30 if now - last_activity > timedelta(minutes=timeout_minutes): # Registrar o logout por timeout try: current_user.ultimo_logout = datetime.now() current_user.motivo_logout = "Timeout de sessão" db_session.commit() except Exception as e: print(f"Erro ao registrar logout por timeout: {e}") session.clear() flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning') return redirect(url_for('login')) # Atualizar timestamp de último acesso session['last_activity'] = time() # Atualizar também no banco de dados try: current_user.update_last_activity() db_session.commit() except Exception as e: print(f"Erro ao atualizar última atividade: {e}") return f(*args, **kwargs) return f(*args, **kwargs) return decorated_function # Rota raiz - redireciona para login se não estiver autenticado @app.route("/") @require_login def index(): """Rota principal""" return redirect(url_for('home')) # Rota de login @app.route("/login", methods=["GET", "POST"]) def login(): """Rota de login""" if request.method == "POST": username = request.form.get("username") password = request.form.get("password") otp_code = request.form.get("otp_code") if not all([username, password, otp_code]): flash("Todos os campos são obrigatórios.", "error") return redirect(url_for("login")) db = get_db_connection() try: user = db.query(Usuario).filter_by(username=username).first() if not user or not user.check_password(password): flash("Usuário ou senha incorretos.", "error") return redirect(url_for("login")) if not user.verify_otp(otp_code): flash("Código OTP inválido.", "error") return redirect(url_for("login")) # Atualizar último login user.ultimo_login = datetime.utcnow() db.commit() # Fazer login login_user(user) # Redirecionar para home return redirect(url_for("home")) finally: db.close() return render_template("login.html") # Rota de logout @app.route("/logout") @login_required def logout(): db = get_db_connection() try: user = current_user if user: user.logout() db.commit() logout_user() finally: db.close() flash('Logout realizado com sucesso!', 'success') return redirect(url_for('login')) # Rota home (protegida) @app.route("/home") @require_login def home(): """Página inicial do sistema com dashboard""" try: # Formatar data atual em português meses = { 1: 'janeiro', 2: 'fevereiro', 3: 'março', 4: 'abril', 5: 'maio', 6: 'junho', 7: 'julho', 8: 'agosto', 9: 'setembro', 10: 'outubro', 11: 'novembro', 12: 'dezembro' } data_atual = datetime.now() data_formatada = f"{data_atual.day} de {meses[data_atual.month]} de {data_atual.year}" # Buscar totais total_militantes = db_session.query(Militante).count() total_cotas = db_session.query(func.sum(CotaMensal.valor_novo)).scalar() or 0 total_materiais = db_session.query(MaterialVendido).count() total_assinaturas = db_session.query(AssinaturaAnual).filter( AssinaturaAnual.data_fim >= datetime.now() ).count() # Buscar últimos militantes cadastrados ultimos_militantes = db_session.query(Militante)\ .order_by(Militante.id.desc())\ .limit(5)\ .all() # Buscar últimos pagamentos ultimos_pagamentos = db_session.query(Pagamento)\ .join(Militante)\ .order_by(Pagamento.data_pagamento.desc())\ .limit(5)\ .all() return render_template('home.html', data_atual=data_formatada, total_militantes=total_militantes, total_cotas="{:.2f}".format(total_cotas), total_materiais=total_materiais, total_assinaturas=total_assinaturas, ultimos_militantes=ultimos_militantes, ultimos_pagamentos=ultimos_pagamentos) except Exception as e: print(f"Erro na página inicial: {e}") import traceback traceback.print_exc() flash('Erro ao carregar a página inicial', 'error') return render_template('home.html', data_atual=datetime.now().strftime("%d/%m/%Y"), total_militantes=0, total_cotas="0.00", total_materiais=0, total_assinaturas=0, ultimos_militantes=[], ultimos_pagamentos=[]) # Rota para criar um novo militante @app.route('/militantes/criar', methods=['GET', 'POST']) @require_login @require_permission('gerenciar_militantes') def criar_militante(): if request.method == 'POST': try: nome = request.form['nome'] email = request.form['email'] cpf = request.form['cpf'] titulo_eleitoral = request.form['titulo_eleitoral'] data_nascimento = datetime.strptime(request.form['data_nascimento'], '%Y-%m-%d').date() if request.form['data_nascimento'] else None data_entrada_oci = datetime.strptime(request.form['data_entrada_oci'], '%Y-%m-%d').date() if request.form['data_entrada_oci'] else None data_efetivacao_oci = datetime.strptime(request.form['data_efetivacao_oci'], '%Y-%m-%d').date() if request.form['data_efetivacao_oci'] else None telefone1 = request.form['telefone1'] telefone2 = request.form['telefone2'] profissao = request.form['profissao'] regime_trabalho = request.form['regime_trabalho'] empresa = request.form['empresa'] contratante = request.form['contratante'] instituicao_ensino = request.form['instituicao_ensino'] tipo_instituicao = request.form['tipo_instituicao'] sindicato = request.form['sindicato'] cargo_sindical = request.form['cargo_sindical'] dirigente_sindical = 'dirigente_sindical' in request.form central_sindical = request.form['central_sindical'] setor_id = request.form['setor_id'] celula_id = request.form['celula_id'] # Processar responsabilidades responsabilidades = 0 if 'responsabilidades' in request.form: for responsabilidade in request.form.getlist('responsabilidades'): responsabilidades |= int(responsabilidade) militante = Militante( nome=nome, email=email, cpf=cpf, titulo_eleitoral=titulo_eleitoral, data_nascimento=data_nascimento, data_entrada_oci=data_entrada_oci, data_efetivacao_oci=data_efetivacao_oci, telefone1=telefone1, telefone2=telefone2, profissao=profissao, regime_trabalho=regime_trabalho, empresa=empresa, contratante=contratante, instituicao_ensino=instituicao_ensino, tipo_instituicao=tipo_instituicao, sindicato=sindicato, cargo_sindical=cargo_sindical, dirigente_sindical=dirigente_sindical, central_sindical=central_sindical, setor_id=setor_id, celula_id=celula_id, responsabilidades=responsabilidades ) db_session.add(militante) db_session.commit() flash('Militante criado com sucesso!', 'success') return redirect(url_for('listar_militantes')) except Exception as e: db_session.rollback() flash(f'Erro ao criar militante: {str(e)}', 'danger') return redirect(url_for('criar_militante')) setores = Setor.query.all() celulas = Celula.query.all() return render_template('criar_militante.html', setores=setores, celulas=celulas) # Rota para listar militantes @app.route("/militantes") @require_login @require_permission(Permission.VIEW_CELL_DATA) def listar_militantes(): try: militantes = db_session.query(Militante).order_by(Militante.nome).all() return render_template("listar_militantes.html", militantes=militantes) except Exception as e: print(f"Erro ao listar militantes: {e}") flash('Erro ao carregar a lista de militantes', 'error') return render_template("listar_militantes.html", militantes=[]) @app.route("/militantes/excluir/", methods=["POST"]) @login_required @session_timeout def excluir_militante(id): try: militante = db_session.query(Militante).get(id) if not militante: flash('Militante não encontrado.', 'error') return redirect(url_for('listar_militantes')) # Verificar se existem registros relacionados tem_cotas = db_session.query(CotaMensal).filter_by(militante_id=id).first() is not None tem_pagamentos = db_session.query(Pagamento).filter_by(militante_id=id).first() is not None tem_materiais = db_session.query(MaterialVendido).filter_by(militante_id=id).first() is not None tem_vendas = db_session.query(VendaJornalAvulso).filter_by(militante_id=id).first() is not None tem_assinaturas = db_session.query(AssinaturaAnual).filter_by(militante_id=id).first() is not None if any([tem_cotas, tem_pagamentos, tem_materiais, tem_vendas, tem_assinaturas]): flash('Não é possível excluir o militante pois existem registros relacionados.', 'error') return redirect(url_for('listar_militantes')) db_session.delete(militante) db_session.commit() flash('Militante excluído com sucesso!', 'success') except Exception as e: print(f"Erro ao excluir militante: {e}") db_session.rollback() flash('Erro ao excluir militante.', 'error') return redirect(url_for('listar_militantes')) # Rota para criar uma nova cota mensal @app.route("/cotas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def nova_cota(): if request.method == "POST": cotas_mensais = CotaMensal( militante_id=request.form["militante_id"], valor_antigo=request.form["valor_antigo"], valor_novo=request.form["valor_novo"], data_alteracao=datetime.strptime(request.form["data_alteracao"], "%Y-%m-%d") ) db_session.add(cotas_mensais) try: db_session.commit() return redirect(url_for("listar_cotas")) except Exception as e: print(e) db_session.rollback() flash('Erro ao cadastrar cota mensal. Verifique se os dados fornecidos são válidos.', 'error') return render_template("nova_cota.html") return render_template("nova_cota.html") # Rota para listar cotas mensais @app.route("/cotas") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_cotas(): """Lista todas as cotas""" cotas = db_session.query(CotaMensal).all() return render_template("listar_cotas.html", cotas=cotas) # Rota para criar um novo pagamento @app.route("/pagamentos/novo", methods=["GET", "POST"]) @require_login def novo_pagamento(): """Rota para criar um novo pagamento""" user = current_user # Verificar se o usuário tem permissão para registrar pagamentos em alguma instância if not (user.has_permission(Permission.REGISTER_CELL_PAYMENT) or user.has_permission(Permission.REGISTER_SECTOR_PAYMENT) or user.has_permission(Permission.REGISTER_CR_PAYMENT) or user.has_permission(Permission.REGISTER_CC_PAYMENT)): flash('Você não tem permissão para registrar pagamentos.', 'error') return redirect(url_for('home')) if request.method == "POST": try: militante_id = request.form.get("militante_id") tipo_pagamento_id = request.form.get("tipo_pagamento_id") valor = request.form.get("valor") data_pagamento = request.form.get("data_pagamento") # Validações if not all([militante_id, tipo_pagamento_id, valor, data_pagamento]): flash("Todos os campos são obrigatórios.", "danger") return redirect(url_for("novo_pagamento")) # Criar novo pagamento pagamento = Pagamento( militante_id=militante_id, tipo_pagamento_id=tipo_pagamento_id, valor=float(valor), data_pagamento=datetime.strptime(data_pagamento, "%Y-%m-%d") ) db_session.add(pagamento) db_session.commit() flash("Pagamento registrado com sucesso!", "success") return redirect(url_for("listar_pagamentos")) except Exception as e: db_session.rollback() app.logger.error(f"Erro ao registrar pagamento: {str(e)}") flash("Erro ao registrar pagamento. Por favor, tente novamente.", "danger") return redirect(url_for("novo_pagamento")) # GET - Renderizar formulário militantes = db_session.query(Militante).order_by(Militante.nome).all() tipos_pagamento = db_session.query(TipoPagamento).order_by(TipoPagamento.descricao).all() hoje = datetime.now().strftime("%Y-%m-%d") return render_template("novo_pagamento.html", militantes=militantes, tipos_pagamento=tipos_pagamento, hoje=hoje) # Rota para listar pagamentos @app.route("/pagamentos") @require_login def listar_pagamentos(): user = current_user # Filtrar pagamentos baseado na instância do usuário if user.has_permission(Permission.REGISTER_CC_PAYMENT): pagamentos = db_session.query(Pagamento).all() elif user.has_permission(Permission.REGISTER_CR_PAYMENT): pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_cr_instances())).all() elif user.has_permission(Permission.REGISTER_SECTOR_PAYMENT): pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_sector_instances())).all() elif user.has_permission(Permission.REGISTER_CELL_PAYMENT): pagamentos = db_session.query(Pagamento).filter(Pagamento.instancia_id.in_(user.get_cell_instances())).all() else: flash('Você não tem permissão para visualizar pagamentos.', 'error') return redirect(url_for('home')) return render_template("listar_pagamentos.html", pagamentos=pagamentos) # Rota para criar um novo material vendido @app.route("/materiais/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def novo_material(): if request.method == "POST": materiais_vendidos = MaterialVendido( militante_id=request.form["militante_id"], tipo_material_id=request.form["tipo_material_id"], descricao=request.form["descricao"], valor=request.form["valor"], data_venda=datetime.strptime(request.form["data_venda"], "%Y-%m-%d"), ) db_session.add(materiais_vendidos) try: db_session.commit() return redirect(url_for("listar_materiais")) except Exception as e: print(e) db_session.rollback() flash('Erro ao cadastrar material vendido. Verifique se os dados fornecidos são válidos.', 'error') return render_template("novo_material.html") return render_template("novo_material.html") # Rota para listar materiais vendidos @app.route("/materiais") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_materiais(): """Lista todos os materiais""" materiais = db_session.query(MaterialVendido).all() return render_template("listar_materiais.html", materiais=materiais) # Rota para criar uma nova venda de jornais avulsos @app.route("/jornais/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def nova_venda_jornal(): if request.method == "POST": vendas_jornais_avulsos = VendaJornalAvulso( militante_id=request.form["militante_id"], quantidade=request.form["quantidade"], valor_total=request.form["valor_total"], data_venda=datetime.strptime(request.form["data_venda"], "%Y-%m-%d"), ) db_session.add(vendas_jornais_avulsos) try: db_session.commit() return redirect(url_for("listar_vendas_jornal")) except Exception as e: print(e) db_session.rollback() flash('Erro ao cadastrar venda de jornal avulso. Verifique se os dados fornecidos são válidos.', 'error') return render_template("nova_venda_jornal.html") return render_template("nova_venda_jornal.html") # Rota para listar vendas de jornais avulsos @app.route("/jornais") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_vendas_jornal(): """Lista todas as vendas de jornal""" vendas = db_session.query(VendaJornalAvulso).all() return render_template("listar_vendas_jornal.html", vendas=vendas) # Rota para criar uma nova assinatura anual @app.route("/assinaturas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def nova_assinatura(): if request.method == "POST": assinaturas_anuais = AssinaturaAnual( militante_id=request.form["militante_id"], tipo_material_id=request.form["tipo_material_id"], quantidade=request.form["quantidade"], valor_total=request.form["valor_total"], data_inicio=datetime.strptime(request.form["data_inicio"], "%Y-%m-%d"), data_fim=datetime.strptime(request.form["data_fim"], "%Y-%m-%d") ) db_session.add(assinaturas_anuais) try: db_session.commit() return redirect(url_for("listar_assinaturas")) except Exception as e: print(e) db_session.rollback() flash('Erro ao cadastrar assinatura anual. Verifique se os dados fornecidos são válidos.', 'error') return render_template("nova_assinatura.html") return render_template("nova_assinatura.html") # Rota para listar assinaturas anuais @app.route("/assinaturas") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_assinaturas(): """Lista todas as assinaturas""" assinaturas = db_session.query(AssinaturaAnual).all() return render_template("listar_assinaturas.html", assinaturas=assinaturas) # Rota para criar um novo relatório de cotas mensais @app.route("/relatorios/cotas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def novo_relatorio_cotas(): if request.method == "POST": relatorio_cotas_mensais = RelatorioCotasMensais( setor_id=request.form["setor_id"], comite_id=request.form["comite_id"], total_cotas=request.form["total_cotas"], data_relatorio=datetime.strptime(request.form["data_relatorio"], "%Y-%m-%d") ) db_session.add(relatorio_cotas_mensais) try: db_session.commit() return redirect(url_for("listar_relatorios_cotas")) except Exception as e: print(e) db_session.rollback() flash('Erro ao cadastrar relatório de cotas mensais. Verifique se os dados fornecidos são válidos.', 'error') return render_template("novo_relatorio_cotas.html") return render_template("novo_relatorio_cotas.html") # Rota para listar relatórios de cotas mensais @app.route("/relatorios/cotas") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_relatorios_cotas(): relatorios = db_session.query(RelatorioCotasMensais).all() return render_template("listar_relatorios_cotas.html", relatorios=relatorios) # Rota para criar um novo relatório de vendas de materiais @app.route("/relatorios/vendas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def novo_relatorio_vendas(): if request.method == "POST": relatorio_vendas_materiais = RelatorioVendasMateriais( setor_id=request.form["setor_id"], comite_id=request.form["comite_id"], total_vendas=request.form["total_vendas"], data_relatorio=datetime.strptime(request.form["data_relatorio"], "%Y-%m-%d") ) db_session.add(relatorio_vendas_materiais) try: db_session.commit() return redirect(url_for("listar_relatorios_vendas")) except Exception as e: print(e) db_session.rollback() flash('Erro ao cadastrar relatório de vendas de materiais. Verifique se os dados fornecidos são válidos.', 'error') return render_template("novo_relatorio_vendas.html") return render_template("novo_relatorio_vendas.html") # Rota para listar relatórios de vendas de materiais @app.route("/relatorios/vendas") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_relatorios_vendas(): relatorios = db_session.query(RelatorioVendasMateriais).all() return render_template("listar_relatorios_vendas.html", relatorios=relatorios) @app.route('/militantes//editar', methods=['GET', 'POST']) @require_login @require_permission(Permission.MANAGE_CELL_MEMBERS) def editar_militante(id): militante = db_session.query(Militante).get(id) if not militante: flash('Militante não encontrado.', 'error') return redirect(url_for('listar_militantes')) if request.method == "POST": cpf = request.form["cpf"] if cpf != militante.cpf and not validar_cpf(cpf): # Só valida se o CPF foi alterado flash('CPF inválido. Por favor, verifique o número informado.', 'error') return render_template("editar_militante.html", militante=militante) try: militante.nome = request.form["nome"] militante.cpf = cpf militante.email = request.form["email"] militante.telefone = request.form["telefone"] militante.endereco = request.form["endereco"] militante.filiado = bool(request.form.get("filiado", False)) db_session.commit() flash('Militante atualizado com sucesso!', 'success') return redirect(url_for('listar_militantes')) except Exception as e: db_session.rollback() flash('Erro ao atualizar militante. Verifique se o CPF ou email já não estão cadastrados.', 'error') return render_template("editar_militante.html", militante=militante) return render_template("editar_militante.html", militante=militante) @app.route('/dash') @require_login def dashboard_admin(): """ Rota para o dashboard administrativo """ db = get_db_connection() try: # Carregar usuários com seus relacionamentos users = db.query(Usuario).options( joinedload(Usuario.roles), joinedload(Usuario.militante) ).all() # Filtrar usuários baseado nas permissões if not current_user.has_permission('system_config'): if current_user.has_permission('manage_cr_sectors'): users = [u for u in users if u.cr_id == current_user.cr_id] elif current_user.has_permission('manage_sector_cells'): users = [u for u in users if u.setor_id == current_user.setor_id] elif current_user.has_permission('manage_cell_members'): users = [u for u in users if u.celula_id == current_user.celula_id] else: users = [] # Carregar dados necessários antes de fechar a sessão for user in users: if user.militante: user.militante.get_responsabilidades_list() return render_template('dashboard.html', users=users) finally: db.close() @app.route("/usuarios//otp/reset", methods=["POST"]) @require_login @require_permission('system_config') def reset_otp(user_id): """Resetar OTP de um usuário""" db = get_db_connection() try: user = db.query(Usuario).get(user_id) if not user: return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404 # Gerar novo OTP user.otp_secret = pyotp.random_base32() db.commit() return jsonify({'success': True, 'message': 'OTP resetado com sucesso'}) finally: db.close() @app.route("/usuarios//password/reset", methods=["POST"]) @require_login @require_permission('system_config') def reset_password(user_id): """Resetar senha de um usuário""" db = get_db_connection() try: user = db.query(Usuario).get(user_id) if not user: return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404 # Gerar nova senha aleatória new_password = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) user.password_hash = generate_password_hash(new_password) db.commit() # Enviar email com a nova senha try: msg = Message( 'Sua senha foi resetada', recipients=[user.email], body=f''' Olá {user.username}, Sua senha foi resetada pelo administrador do sistema. Sua nova senha é: {new_password} Por favor, altere sua senha após o primeiro login. Atenciosamente, Sistema de Controle OCI ''' ) mail.send(msg) return jsonify({'success': True, 'message': 'Senha resetada e enviada por email'}) except Exception as e: db.rollback() return jsonify({'success': False, 'message': f'Erro ao enviar email: {str(e)}'}), 500 finally: db.close() @app.route('/check_session') def check_session(): if not current_user.is_authenticated: return jsonify({'status': 'expired'}) db = get_db_connection() try: if current_user.is_session_expired(): current_user.logout() db.commit() logout_user() return jsonify({'status': 'expired'}) current_user.update_last_activity() db.commit() return jsonify({'status': 'active'}) finally: db.close() @app.route("/qr/") def get_qr_code(token): db = get_db_connection() try: user = db.query(Usuario).filter_by(username='admin').first() if not user: flash('Usuário não encontrado', 'error') return redirect(url_for('login')) qr_uri = user.get_otp_uri() return render_template('mostrar_qr_code.html', qr_uri=qr_uri) finally: db.close() # Adicionar nova rota para API de setores @app.route("/api/setores/") @require_login def get_setores(cr_id): setores = db_session.query(Setor).filter_by(cr_id=cr_id).all() return jsonify({ 'setores': [{'id': s.id, 'nome': s.nome} for s in setores] }) @app.route('/celulas//militantes') @require_login def list_militantes_celula(celula_id): @require_instance_access('celula', celula_id) def decorated_function(celula_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(celula_id=celula_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(celula_id) @app.route('/setores//militantes') @require_login def list_militantes_setor(setor_id): @require_instance_access('setor', setor_id) def decorated_function(setor_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(setor_id=setor_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(setor_id) @app.route('/crs//militantes') @require_login def list_militantes_cr(cr_id): @require_instance_access('cr', cr_id) def decorated_function(cr_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(cr_id=cr_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(cr_id) @app.route('/celulas//pagamentos') @require_login def list_pagamentos_celula(celula_id): @require_instance_access('celula', celula_id) def decorated_function(celula_id): db = get_db_connection() try: pagamentos = db.query(Pagamento).filter_by(celula_id=celula_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) finally: db.close() return decorated_function(celula_id) @app.route('/setores//pagamentos') @require_login def list_pagamentos_setor(setor_id): @require_instance_access('setor', setor_id) def decorated_function(setor_id): db = get_db_connection() try: pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) finally: db.close() return decorated_function(setor_id) @app.route('/crs//pagamentos') @require_login def list_pagamentos_cr(cr_id): @require_instance_access('cr', cr_id) def decorated_function(cr_id): db = get_db_connection() try: pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) finally: db.close() return decorated_function(cr_id) @app.route('/celulas//pagamentos/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_CELL_PAYMENT', 'celula_id') def novo_pagamento_celula(celula_id): if request.method == 'POST': db = get_db_connection() try: pagamento = Pagamento( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], celula_id=celula_id ) db.add(pagamento) db.commit() flash('Pagamento registrado com sucesso!', 'success') return redirect(url_for('list_pagamentos_celula', celula_id=celula_id)) finally: db.close() return render_template('pagamentos/form.html') @app.route('/setores//pagamentos/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_SECTOR_PAYMENT', 'setor_id') def novo_pagamento_setor(setor_id): if request.method == 'POST': db = get_db_connection() try: pagamento = Pagamento( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], setor_id=setor_id ) db.add(pagamento) db.commit() flash('Pagamento registrado com sucesso!', 'success') return redirect(url_for('list_pagamentos_setor', setor_id=setor_id)) finally: db.close() return render_template('pagamentos/form.html') @app.route('/crs//pagamentos/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_CR_PAYMENT', 'cr_id') def novo_pagamento_cr(cr_id): if request.method == 'POST': db = get_db_connection() try: pagamento = Pagamento( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], cr_id=cr_id ) db.add(pagamento) db.commit() flash('Pagamento registrado com sucesso!', 'success') return redirect(url_for('list_pagamentos_cr', cr_id=cr_id)) finally: db.close() return render_template('pagamentos/form.html') @app.route("/alterar_senha", methods=["GET", "POST"]) @require_login def alterar_senha(): """Rota para alterar a senha do usuário""" if request.method == "POST": senha_atual = request.form.get("senha_atual") nova_senha = request.form.get("nova_senha") confirmar_senha = request.form.get("confirmar_senha") if not all([senha_atual, nova_senha, confirmar_senha]): flash("Todos os campos são obrigatórios.", "error") return redirect(url_for("alterar_senha")) if nova_senha != confirmar_senha: flash("As senhas não coincidem.", "error") return redirect(url_for("alterar_senha")) db = get_db_connection() try: user = db.query(Usuario).get(current_user.id) if not user.check_password(senha_atual): flash("Senha atual incorreta.", "error") return redirect(url_for("alterar_senha")) user.password_hash = generate_password_hash(nova_senha) db.commit() flash("Senha alterada com sucesso!", "success") return redirect(url_for("home")) finally: db.close() return render_template("alterar_senha.html") @app.route('/usuarios//toggle_status', methods=['POST']) @require_login def toggle_user_status(user_id): user = db_session.query(Usuario).get_or_404(user_id) # Verificar permissões baseado na hierarquia if not current_user.has_permission('system_config'): if current_user.has_permission('manage_cr_sectors'): # Secretário de CR só pode gerenciar membros do seu CR if user.cr_id != current_user.cr_id: flash('Você não tem permissão para gerenciar este usuário.', 'danger') return redirect(url_for('dashboard_admin')) elif current_user.has_permission('manage_sector_cells'): # Secretário de Setor só pode gerenciar membros do seu setor if user.setor_id != current_user.setor_id: flash('Você não tem permissão para gerenciar este usuário.', 'danger') return redirect(url_for('dashboard_admin')) elif current_user.has_permission('manage_cell_members'): # Secretário de Célula só pode gerenciar membros da sua célula if user.celula_id != current_user.celula_id: flash('Você não tem permissão para gerenciar este usuário.', 'danger') return redirect(url_for('dashboard_admin')) else: # Militante básico não pode gerenciar ninguém flash('Você não tem permissão para gerenciar usuários.', 'danger') return redirect(url_for('dashboard_admin')) user.ativo = not user.ativo db_session.commit() return jsonify({'success': True, 'message': 'Status do usuário alterado com sucesso!'}) @app.route('/usuarios//alterar_nivel', methods=['POST']) @require_login def alterar_nivel(user_id): user = db_session.query(Usuario).get_or_404(user_id) novo_nivel = request.json.get('nivel') # Verificar permissões baseado na hierarquia if not current_user.has_permission('system_config'): if current_user.has_permission('manage_cr_sectors'): # Secretário de CR só pode alterar níveis dentro do seu CR if user.cr_id != current_user.cr_id: return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'}) elif current_user.has_permission('manage_sector_cells'): # Secretário de Setor só pode alterar níveis dentro do seu setor if user.setor_id != current_user.setor_id: return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'}) else: # Outros níveis não podem alterar níveis return jsonify({'success': False, 'message': 'Você não tem permissão para alterar níveis de usuários.'}) # Verificar se o novo nível é válido para o nível hierárquico do usuário atual if current_user.has_permission('system_config'): # Secretário Geral e Secretário de Organização podem alterar para qualquer nível pass elif current_user.has_permission('manage_cr_sectors'): # Secretário de CR só pode alterar para níveis do CR if novo_nivel not in ['membro_cr', 'secretario_cr']: return jsonify({'success': False, 'message': 'Nível inválido para este CR.'}) elif current_user.has_permission('manage_sector_cells'): # Secretário de Setor só pode alterar para níveis do setor if novo_nivel not in ['membro_setor', 'secretario_setor']: return jsonify({'success': False, 'message': 'Nível inválido para este setor.'}) # Atualizar o nível do usuário user.role = novo_nivel db_session.commit() return jsonify({'success': True, 'message': 'Nível do usuário alterado com sucesso!'}) @app.route('/usuarios//toggle_quadro_orientador', methods=['POST']) @require_login def toggle_quadro_orientador(user_id): db = get_db_connection() try: user = db.query(Usuario).get(user_id) if not user: return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404 # Verificar permissões if not (current_user.has_permission('system_config') or (current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or (current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)): return jsonify({'success': False, 'message': 'Você não tem permissão para alterar esta responsabilidade'}), 403 # Verificar se o usuário tem um militante associado if not user.militante: return jsonify({'success': False, 'message': 'Usuário não tem um militante associado'}), 400 # Alternar o status de Quadro-Orientador user.militante.quadro_orientador = not user.militante.quadro_orientador # Atualizar a responsabilidade no campo responsabilidades if user.militante.quadro_orientador: user.militante.responsabilidades |= Militante.QUADRO_ORIENTADOR else: user.militante.responsabilidades &= ~Militante.QUADRO_ORIENTADOR db.commit() return jsonify({ 'success': True, 'message': f'Responsabilidade de Quadro-Orientador {"adicionada" if user.militante.quadro_orientador else "removida"} com sucesso' }) except Exception as e: db.rollback() return jsonify({'success': False, 'message': str(e)}), 500 finally: db.close() @app.route('/usuarios//toggle_aspirante', methods=['POST']) @require_login def toggle_aspirante(user_id): db = get_db_connection() try: user = db.query(Usuario).get(user_id) if not user: return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404 # Verificar permissões if not (current_user.has_permission('system_config') or (current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or (current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)): return jsonify({'success': False, 'message': 'Você não tem permissão para alterar este status'}), 403 # Verificar se o usuário tem um militante associado if not user.militante: return jsonify({'success': False, 'message': 'Usuário não tem um militante associado'}), 400 # Se estiver tentando remover o status de aspirante if user.militante.aspirante: # Verificar se já passaram 3 meses if datetime.utcnow() - user.militante.data_inicio_aspirante < timedelta(days=90): return jsonify({ 'success': False, 'message': 'Não é possível remover o status de aspirante antes de 3 meses de integração' }), 400 # Verificar se há avaliação if not user.militante.avaliacao_aspirante: return jsonify({ 'success': False, 'message': 'É necessário registrar uma avaliação antes de remover o status de aspirante' }), 400 # Alternar o status de Aspirante user.militante.aspirante = not user.militante.aspirante # Atualizar a responsabilidade no campo responsabilidades if user.militante.aspirante: user.militante.responsabilidades |= Militante.ASPIRANTE user.militante.data_inicio_aspirante = datetime.utcnow() user.militante.avaliacao_aspirante = None user.militante.data_avaliacao_aspirante = None else: user.militante.responsabilidades &= ~Militante.ASPIRANTE user.militante.data_avaliacao_aspirante = datetime.utcnow() db.commit() return jsonify({ 'success': True, 'message': f'Status de Aspirante {"adicionado" if user.militante.aspirante else "removido"} com sucesso' }) except Exception as e: db.rollback() return jsonify({'success': False, 'message': str(e)}), 500 finally: db.close() @app.route('/usuarios//avaliar_aspirante', methods=['POST']) @require_login def avaliar_aspirante(user_id): db = get_db_connection() try: user = db.query(Usuario).get(user_id) if not user: return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404 # Verificar permissões if not (current_user.has_permission('system_config') or (current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or (current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)): return jsonify({'success': False, 'message': 'Você não tem permissão para avaliar este aspirante'}), 403 # Verificar se o usuário tem um militante associado e é aspirante if not user.militante or not user.militante.aspirante: return jsonify({'success': False, 'message': 'Usuário não é um aspirante'}), 400 # Verificar se já passaram 3 meses if datetime.utcnow() - user.militante.data_inicio_aspirante < timedelta(days=90): return jsonify({ 'success': False, 'message': 'Não é possível avaliar o aspirante antes de 3 meses de integração' }), 400 # Obter a avaliação do corpo da requisição avaliacao = request.json.get('avaliacao') if not avaliacao: return jsonify({'success': False, 'message': 'A avaliação é obrigatória'}), 400 # Atualizar a avaliação user.militante.avaliacao_aspirante = avaliacao user.militante.data_avaliacao_aspirante = datetime.utcnow() db.commit() return jsonify({ 'success': True, 'message': 'Avaliação registrada com sucesso' }) except Exception as e: db.rollback() return jsonify({'success': False, 'message': str(e)}), 500 finally: db.close() @app.template_filter('has_permission') def has_permission(value, permission): """Verifica se o valor contém a permissão especificada.""" return bool(value & permission) def create_app(): app = Flask(__name__) # ... existing code ... # ... existing code ... return app def init_system(): """Inicializa o sistema com todos os usuários necessários""" print("Inicializando sistema...") # Inicializar banco de dados print("Inicializando banco de dados...") init_database() # Criar admin create_admin() # Criar usuários de teste create_test_users() # Verificar configuração db = get_db_connection() try: # Verificar admin admin = db.query(Usuario).filter_by(username='admin').first() if admin: print("\nAdmin configurado:") print(f"Username: admin") print(f"Senha: admin123") if os.path.exists('admin_qr.png'): print("OTP: Usando configuração existente do arquivo admin_qr.png") else: print("OTP: Nova configuração gerada") # Verificar usuários de teste test_users = ['aligner', 'tester', 'deployer'] for username in test_users: user = db.query(Usuario).filter_by(username=username).first() if user: print(f"\nUsuário {username} configurado:") print(f"Username: {username}") print(f"Senha: Test123!@#") if user.otp_secret: print("OTP: Configurado") else: print("OTP: Não configurado") finally: db.close() print("\nInstruções:") print("1. Configure o OTP usando o QR code gerado") print("2. Faça login com as credenciais fornecidas") print("3. Altere a senha no primeiro login") print("\nCredenciais:") print("Admin:") print("Username: admin") print("Senha: admin123") print("\nUsuários de teste:") print("Username: aligner, tester, deployer") print("Senha: Test123!@#") @app.route('/militante/desligar/', methods=['POST']) @login_required def desligar_militante(id): """Desliga um militante e desativa seu usuário associado""" militante = db_session.query(Militante).get(id) if militante is None: flash('Militante não encontrado.', 'danger') return redirect(url_for('listar_militantes')) motivo = request.form.get('motivo') if not motivo: flash('É necessário informar o motivo do desligamento.', 'danger') return redirect(url_for('listar_militantes')) # Atualizar estado do militante militante.estado = EstadoMilitante.DESLIGADO.value militante.data_desligamento = datetime.utcnow() militante.motivo_desligamento = motivo # Desativar usuário associado se existir if militante.usuario: militante.usuario.ativo = False militante.usuario.ultimo_logout = datetime.utcnow() militante.usuario.motivo_logout = "Desligamento do militante" db_session.commit() flash('Militante desligado com sucesso.', 'success') return redirect(url_for('listar_militantes')) @app.route('/editar_pagamento/', methods=['GET', 'POST']) @require_login def editar_pagamento(id): user = current_user # Verificar permissões do usuário if not (user.has_permission(Permission.EDIT_CELL_PAYMENT) or user.has_permission(Permission.EDIT_SECTOR_PAYMENT) or user.has_permission(Permission.EDIT_CR_PAYMENT) or user.has_permission(Permission.EDIT_CC_PAYMENT)): flash('Você não tem permissão para editar pagamentos.', 'error') return redirect(url_for('home')) pagamento = db_session.query(Pagamento).get(id) if not pagamento: flash('Pagamento não encontrado.', 'error') return redirect(url_for('listar_pagamentos')) if request.method == 'POST': pagamento.valor = request.form['valor'] pagamento.data_pagamento = datetime.strptime(request.form['data_pagamento'], '%Y-%m-%d') db_session.commit() flash('Pagamento atualizado com sucesso!', 'success') return redirect(url_for('listar_pagamentos')) return render_template('editar_pagamento.html', pagamento=pagamento) @app.before_request def session_timeout(): """Verifica se a sessão expirou""" if current_user.is_authenticated: if 'last_activity' not in session: session['last_activity'] = time() return last_activity = datetime.fromtimestamp(session['last_activity']) now = datetime.now() # Se passaram mais de 30 minutos (configurável) timeout_minutes = 30 if now - last_activity > timedelta(minutes=timeout_minutes): # Registrar o logout por timeout try: current_user.ultimo_logout = datetime.now() current_user.motivo_logout = "Timeout de sessão" db_session.commit() except Exception as e: print(f"Erro ao registrar logout por timeout: {e}") session.clear() flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning') return redirect(url_for('login')) # Atualizar timestamp de último acesso session['last_activity'] = time() # Atualizar também no banco de dados try: current_user.update_last_activity() db_session.commit() except Exception as e: print(f"Erro ao atualizar última atividade: {e}") if __name__ == '__main__': init_system() app.run(debug=True)