from flask import Flask, request, render_template, redirect, url_for, flash, session, jsonify, send_file from functions.database import ( Base, Militante, CotaMensal, TipoPagamento, Pagamento, MaterialVendido, TipoMaterial, VendaJornalAvulso, engine, AssinaturaAnual, RelatorioCotasMensais, RelatorioVendasMateriais, Usuario, get_db_connection, ComiteRegional, Setor, Celula, ComiteCentral, EmailMilitante, init_database, EstadoMilitante, ) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, joinedload from datetime import datetime, timedelta from flask_bootstrap import Bootstrap5 from functions.validations import validar_cpf from functools import wraps from pathlib import Path from time import time from flask_mail import Mail, Message import os from dotenv import load_dotenv from functions.rbac import Role, Permission, init_rbac from functions.decorators import require_permission, require_role, require_minimum_role, require_login, require_instance_permission, require_instance_access import re import secrets import pyotp import qrcode import base64 from io import BytesIO from create_admin import create_admin_user from create_test_users import create_test_users from werkzeug.security import generate_password_hash, check_password_hash from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user import random import string from sqlalchemy.sql import func load_dotenv() app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY', secrets.token_hex(16)) bootstrap = Bootstrap5(app) # Configurar Flask-Login login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' # Adicionar filtros Jinja2 @app.template_filter('bitwise_and') def bitwise_and(value1, value2): """Filtro para operação bit a bit AND""" return value1 & value2 @login_manager.user_loader def load_user(user_id): """Carrega o usuário pelo ID""" db = get_db_connection() try: # Carregar o usuário com suas roles user = db.query(Usuario).options( joinedload(Usuario.roles) ).get(user_id) return user finally: db.close() def generate_qr_code(user): """Gera um QR code para o usuário""" if not user.otp_secret: user.otp_secret = pyotp.random_base32() totp = pyotp.TOTP(user.otp_secret) qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles")) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = BytesIO() img.save(buffer, format="PNG") qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8') return qr_code # Configuração da sessão do SQLAlchemy db_session = get_db_connection() # Configurar Flask-Mail app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER', 'smtp.gmail.com') app.config['MAIL_PORT'] = int(os.getenv('MAIL_PORT', 587)) app.config['MAIL_USE_TLS'] = os.getenv('MAIL_USE_TLS', 'True').lower() == 'true' app.config['MAIL_USERNAME'] = os.getenv('MAIL_USERNAME') app.config['MAIL_PASSWORD'] = os.getenv('MAIL_PASSWORD') app.config['MAIL_DEFAULT_SENDER'] = os.getenv('MAIL_DEFAULT_SENDER') mail = Mail(app) # Inicializar banco de dados e RBAC print("Inicializando banco de dados...") init_database() print("Inicializando sistema RBAC...") init_rbac() # Criar admin e usuários de teste print("Criando usuários iniciais...") create_admin_user() create_test_users() # Decorator para verificar se o usuário está logado def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: flash('Por favor, faça login para acessar esta página.', 'warning') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # Decorator para verificar se a sessão expirou def session_timeout(f): @wraps(f) def decorated_function(*args, **kwargs): if current_user.is_authenticated: if 'last_activity' not in session: session['last_activity'] = time() return f(*args, **kwargs) last_activity = datetime.fromtimestamp(session['last_activity']) now = datetime.now() # Se passaram mais de 30 minutos (configurável) timeout_minutes = 30 if now - last_activity > timedelta(minutes=timeout_minutes): # Registrar o logout por timeout try: current_user.ultimo_logout = datetime.now() current_user.motivo_logout = "Timeout de sessão" db_session.commit() except Exception as e: print(f"Erro ao registrar logout por timeout: {e}") session.clear() flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning') return redirect(url_for('login')) # Atualizar timestamp de último acesso session['last_activity'] = time() # Atualizar também no banco de dados try: current_user.update_last_activity() db_session.commit() except Exception as e: print(f"Erro ao atualizar última atividade: {e}") return f(*args, **kwargs) return f(*args, **kwargs) return decorated_function # Rota raiz - redireciona para login se não estiver autenticado @app.route("/") @require_login def index(): """Rota principal""" return redirect(url_for('home')) # Rota de login @app.route("/login", methods=["GET", "POST"]) def login(): """Rota de login""" if request.method == "POST": email_or_username = request.form.get("email") password = request.form.get("password") otp = request.form.get("otp") if not all([email_or_username, password]): flash("Email/usuário e senha são obrigatórios.", "danger") return redirect(url_for("login")) db = get_db_connection() try: # Tenta encontrar o usuário por email ou username user = db.query(Usuario).filter( (Usuario.email == email_or_username) | (Usuario.username == email_or_username) ).first() if not user or not user.check_password(password): flash("Email/usuário ou senha incorretos.", "danger") return redirect(url_for("login")) # Verificar OTP se o usuário tiver configurado if user.otp_secret and not otp: flash("Código OTP é obrigatório para sua conta.", "danger") return redirect(url_for("login")) if user.otp_secret and not user.verify_otp(otp): flash("Código OTP inválido.", "danger") return redirect(url_for("login")) # Atualizar último login user.ultimo_login = datetime.utcnow() db.commit() # Fazer login e setar sessão login_user(user) session['user_id'] = user.id session['username'] = user.username session['is_admin'] = user.is_admin # Redirecionar para home return redirect(url_for("home")) finally: db.close() return render_template("login.html") # Rota de logout @app.route("/logout") @login_required def logout(): db = get_db_connection() try: user = current_user if user: user.logout() db.commit() logout_user() finally: db.close() flash('Logout realizado com sucesso!', 'success') return redirect(url_for('login')) # Rota home (protegida) @app.route("/home") @require_login def home(): """Página inicial do sistema com dashboard""" db = get_db_connection() try: # Buscar nome do usuário usuario = db.query(Usuario).get(session.get('user_id')) nome_usuario = usuario.username if usuario else "Usuário" # Formatar data atual em português data_atual = datetime.now().strftime("%d de %B de %Y") # Buscar dados para o dashboard total_militantes = db.query(Militante).count() total_cotas = db.query(func.sum(CotaMensal.valor_novo)).scalar() or 0 total_materiais = db.query(MaterialVendido).count() total_assinaturas = db.query(AssinaturaAnual).count() # Buscar últimos militantes cadastrados ultimos_militantes = db.query(Militante)\ .order_by(Militante.id.desc())\ .limit(5)\ .all() # Buscar últimos pagamentos ultimos_pagamentos = db.query(Pagamento)\ .join(Militante)\ .order_by(Pagamento.data_pagamento.desc())\ .limit(5)\ .all() # Buscar tipos de pagamento tipos_pagamento = db.query(TipoPagamento).all() return render_template('home.html', nome_usuario=nome_usuario, data_atual=data_atual, total_militantes=total_militantes, total_cotas="{:.2f}".format(total_cotas), total_materiais=total_materiais, total_assinaturas=total_assinaturas, ultimos_militantes=ultimos_militantes, ultimos_pagamentos=ultimos_pagamentos, tipos_pagamento=tipos_pagamento) except Exception as e: print(f"Erro na página inicial: {e}") import traceback traceback.print_exc() flash('Erro ao carregar a página inicial', 'danger') return render_template('home.html', nome_usuario="Usuário", data_atual=datetime.now().strftime("%d/%m/%Y"), total_militantes=0, total_cotas="0.00", total_materiais=0, total_assinaturas=0, ultimos_militantes=[], ultimos_pagamentos=[]) finally: db.close() # Rota para criar um novo militante @app.route('/militantes/criar', methods=['GET', 'POST']) @require_login @require_permission('gerenciar_militantes') def criar_militante(): if request.method == 'POST': try: nome = request.form['nome'] email = request.form['email'] cpf = request.form['cpf'] titulo_eleitoral = request.form['titulo_eleitoral'] data_nascimento = datetime.strptime(request.form['data_nascimento'], '%Y-%m-%d').date() if request.form['data_nascimento'] else None data_entrada_oci = datetime.strptime(request.form['data_entrada_oci'], '%Y-%m-%d').date() if request.form['data_entrada_oci'] else None data_efetivacao_oci = datetime.strptime(request.form['data_efetivacao_oci'], '%Y-%m-%d').date() if request.form['data_efetivacao_oci'] else None telefone1 = request.form['telefone1'] telefone2 = request.form['telefone2'] profissao = request.form['profissao'] regime_trabalho = request.form['regime_trabalho'] empresa = request.form['empresa'] contratante = request.form['contratante'] instituicao_ensino = request.form['instituicao_ensino'] tipo_instituicao = request.form['tipo_instituicao'] sindicato = request.form['sindicato'] cargo_sindical = request.form['cargo_sindical'] dirigente_sindical = 'dirigente_sindical' in request.form central_sindical = request.form['central_sindical'] setor_id = request.form['setor_id'] celula_id = request.form['celula_id'] # Processar responsabilidades responsabilidades = 0 if 'responsabilidades' in request.form: for responsabilidade in request.form.getlist('responsabilidades'): responsabilidades |= int(responsabilidade) militante = Militante( nome=nome, email=email, cpf=cpf, titulo_eleitoral=titulo_eleitoral, data_nascimento=data_nascimento, data_entrada_oci=data_entrada_oci, data_efetivacao_oci=data_efetivacao_oci, telefone1=telefone1, telefone2=telefone2, profissao=profissao, regime_trabalho=regime_trabalho, empresa=empresa, contratante=contratante, instituicao_ensino=instituicao_ensino, tipo_instituicao=tipo_instituicao, sindicato=sindicato, cargo_sindical=cargo_sindical, dirigente_sindical=dirigente_sindical, central_sindical=central_sindical, setor_id=setor_id, celula_id=celula_id, responsabilidades=responsabilidades ) db_session.add(militante) db_session.commit() flash('Militante criado com sucesso!', 'success') return redirect(url_for('listar_militantes')) except Exception as e: db_session.rollback() flash(f'Erro ao criar militante: {str(e)}', 'danger') return redirect(url_for('criar_militante')) setores = Setor.query.all() celulas = Celula.query.all() return render_template('criar_militante.html', setores=setores, celulas=celulas) # Rota para listar militantes @app.route("/militantes") @require_login @require_permission(Permission.MANAGE_CELL_MEMBERS) def listar_militantes(): db = get_db_connection() try: militantes = db.query(Militante).all() celulas = db.query(Celula).order_by(Celula.nome).all() return render_template('listar_militantes.html', militantes=militantes, Militante=Militante, celulas=celulas) finally: db.close() # Rota para excluir militante @app.route("/militantes/excluir/", methods=["POST"]) @login_required @session_timeout def excluir_militante(id): try: db = get_db_connection() militante = db.query(Militante).get(id) if not militante: flash('Militante não encontrado', 'danger') return redirect(url_for('listar_militantes')) db.delete(militante) db.commit() flash('Militante excluído com sucesso!', 'success') except Exception as e: db.rollback() print(f"Erro ao excluir militante: {e}") flash('Erro ao excluir militante', 'danger') finally: db.close() return redirect(url_for('listar_militantes')) # Rota para criar uma nova cota @app.route("/cotas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def nova_cota(): if request.method == "POST": militante_id = request.form.get("militante_id") valor_antigo = float(request.form.get("valor_antigo")) valor_novo = float(request.form.get("valor_novo")) data_alteracao = datetime.strptime(request.form.get("data_alteracao"), "%Y-%m-%d").date() data_vencimento = datetime.strptime(request.form.get("data_vencimento"), "%Y-%m-%d").date() db = get_db_connection() try: cotas_mensais = CotaMensal( militante_id=militante_id, valor_antigo=valor_antigo, valor_novo=valor_novo, data_alteracao=data_alteracao, data_vencimento=data_vencimento, pago=False ) db.add(cotas_mensais) db.commit() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'success', 'message': 'Cota cadastrada com sucesso!' }) flash('Cota cadastrada com sucesso!', 'success') return redirect(url_for('listar_cotas')) except Exception as e: db.rollback() print(f"Erro ao cadastrar cota: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'error', 'message': 'Erro ao cadastrar cota. Verifique os dados e tente novamente.' }), 400 flash('Erro ao cadastrar cota', 'danger') return render_template("nova_cota.html") finally: db.close() db = get_db_connection() try: militantes = db.query(Militante).order_by(Militante.nome).all() return render_template("nova_cota.html", militantes=militantes) finally: db.close() # Rota para listar cotas @app.route("/cotas") @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def listar_cotas(): try: print("Buscando cotas...") db = get_db_connection() cotas = db.query(CotaMensal)\ .join(Militante)\ .order_by(CotaMensal.data_vencimento.desc())\ .all() # Calcular status de cada cota for cota in cotas: if cota.pago: cota.status = "paga" elif cota.data_vencimento < datetime.now().date(): cota.status = "atrasada" else: cota.status = "pendente" # Buscar militantes para o modal de nova cota militantes = db.query(Militante).order_by(Militante.nome).all() return render_template("listar_cotas.html", cotas=cotas, militantes=militantes) except Exception as e: print(f"Erro ao listar cotas: {e}") flash('Erro ao listar cotas', 'danger') return render_template("listar_cotas.html", cotas=[], militantes=[]) finally: db.close() # Rota para editar cota @app.route('/cotas/editar/', methods=['GET', 'POST']) @login_required @session_timeout def editar_cota(id): db = get_db_connection() try: cota = db.query(CotaMensal).get_or_404(id) if request.method == 'POST': try: print("Dados recebidos:", request.form) cota.militante_id = int(request.form['militante_id']) cota.valor_antigo = float(request.form['valor_antigo']) cota.valor_novo = float(request.form['valor_novo']) cota.data_alteracao = datetime.strptime(request.form['data_alteracao'], '%Y-%m-%d').date() cota.data_vencimento = datetime.strptime(request.form['data_vencimento'], '%Y-%m-%d').date() # Processar o campo pago pago = request.form.get('pago', '').lower() print("Valor do campo pago recebido:", pago) cota.pago = pago == 'true' print("Status final do pago:", cota.pago) db.commit() print("Commit realizado com sucesso") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'success', 'message': 'Cota atualizada com sucesso!' }) flash('Cota atualizada com sucesso!', 'success') return redirect(url_for('listar_cotas')) except Exception as e: db.rollback() print(f"Erro ao atualizar cota: {str(e)}") print(f"Tipo do erro: {type(e)}") import traceback traceback.print_exc() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'error', 'message': f'Erro ao atualizar cota: {str(e)}' }), 400 flash('Erro ao atualizar cota. Verifique os dados e tente novamente.', 'danger') return redirect(url_for('editar_cota', id=id)) return render_template('editar_cota.html', cota=cota) except Exception as e: print(f"Erro ao carregar cota: {str(e)}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'error', 'message': f'Erro ao carregar cota: {str(e)}' }), 404 flash('Erro ao carregar cota', 'danger') return redirect(url_for('listar_cotas')) finally: db.close() # Rota para criar um novo pagamento @app.route("/pagamentos/novo", methods=["GET", "POST"]) @require_login def novo_pagamento(): if request.method == "POST": militante_id = request.form.get("militante_id") tipo_pagamento_id = request.form.get("tipo_pagamento_id") valor = float(request.form.get("valor")) data_pagamento = datetime.strptime(request.form.get("data_pagamento"), "%Y-%m-%d").date() # Criar novo pagamento db = get_db_connection() try: pagamento = Pagamento( militante_id=militante_id, tipo_pagamento_id=tipo_pagamento_id, valor=valor, data_pagamento=data_pagamento ) db.add(pagamento) db.commit() flash('Pagamento cadastrado com sucesso!', 'success') return redirect(url_for('listar_pagamentos')) except Exception as e: db.rollback() print(f"Erro ao cadastrar pagamento: {e}") flash('Erro ao cadastrar pagamento', 'danger') return render_template("novo_pagamento.html") finally: db.close() # GET - Renderizar formulário db = get_db_connection() try: militantes = db.query(Militante).order_by(Militante.nome).all() tipos_pagamento = db.query(TipoPagamento).order_by(TipoPagamento.descricao).all() return render_template("novo_pagamento.html", militantes=militantes, tipos_pagamento=tipos_pagamento) finally: db.close() # Rota para listar pagamentos @app.route("/pagamentos") @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def listar_pagamentos(): try: print("Buscando pagamentos...") db = get_db_connection() pagamentos = db.query(Pagamento)\ .join(Militante)\ .order_by(Pagamento.data_pagamento.desc())\ .all() militantes = db.query(Militante).order_by(Militante.nome).all() tipos_pagamento = db.query(TipoPagamento).all() return render_template("listar_pagamentos.html", pagamentos=pagamentos, militantes=militantes, tipos_pagamento=tipos_pagamento) except Exception as e: print(f"Erro ao listar pagamentos: {e}") flash('Erro ao listar pagamentos', 'danger') return render_template("listar_pagamentos.html", pagamentos=[], militantes=[]) finally: db.close() # Rota para adicionar pagamento @app.route("/pagamentos/adicionar", methods=["POST"]) @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def adicionar_pagamento(): if request.method == "POST": try: militante_id = request.form.get("militante_id") tipo_pagamento = request.form.get("tipo_pagamento") valor = float(request.form.get("valor")) data_pagamento = datetime.strptime(request.form.get("data_pagamento"), "%Y-%m-%d").date() db = get_db_connection() pagamento = Pagamento( militante_id=militante_id, tipo_pagamento=tipo_pagamento, valor=valor, data_pagamento=data_pagamento ) db.add(pagamento) db.commit() flash('Pagamento adicionado com sucesso!', 'success') except Exception as e: db.rollback() flash(f'Erro ao adicionar pagamento: {str(e)}', 'danger') finally: db.close() return redirect(url_for('listar_pagamentos')) # Rota para criar um novo material vendido @app.route("/materiais/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def novo_material(): db = get_db_connection() try: militante_id = request.form.get('militante_id') tipo_material_id = request.form.get('tipo_material_id') descricao = request.form.get('descricao') valor = float(request.form.get('valor')) data_venda = datetime.strptime(request.form.get('data_venda'), '%Y-%m-%d') material = MaterialVendido( militante_id=militante_id, tipo_material_id=tipo_material_id, descricao=descricao, valor=valor, data_venda=data_venda ) db.add(material) db.commit() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'success', 'message': 'Material cadastrado com sucesso!' }) flash('Material cadastrado com sucesso!', 'success') return redirect(url_for('listar_materiais')) except Exception as e: db.rollback() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'error', 'message': 'Erro ao cadastrar material. Por favor, tente novamente.' }), 400 flash('Erro ao cadastrar material. Por favor, tente novamente.', 'error') return redirect(url_for('listar_materiais')) finally: db.close() # Rota para listar materiais vendidos @app.route("/materiais") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_materiais(): db = get_db_connection() try: materiais = db.query(MaterialVendido).join(Militante).join(TipoMaterial).all() militantes = db.query(Militante).all() tipos_material = db.query(TipoMaterial).all() return render_template('listar_materiais.html', materiais=materiais, militantes=militantes, tipos_material=tipos_material) finally: db.close() # Rota para criar uma nova venda de jornal @app.route("/jornais/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def nova_venda_jornal(): db = get_db_connection() try: militante_id = request.form.get('militante_id') quantidade = int(request.form.get('quantidade')) valor_total = float(request.form.get('valor_total')) data_venda = datetime.strptime(request.form.get('data_venda'), '%Y-%m-%d') venda = VendaJornalAvulso( militante_id=militante_id, quantidade=quantidade, valor_total=valor_total, data_venda=data_venda ) db.add(venda) db.commit() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'success', 'message': 'Venda cadastrada com sucesso!' }) flash('Venda cadastrada com sucesso!', 'success') return redirect(url_for('listar_vendas_jornal')) except Exception as e: db.rollback() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'error', 'message': 'Erro ao cadastrar venda. Por favor, tente novamente.' }), 400 flash('Erro ao cadastrar venda. Por favor, tente novamente.', 'error') return redirect(url_for('listar_vendas_jornal')) finally: db.close() # Rota para listar vendas de jornal @app.route("/jornais") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_vendas_jornal(): db = get_db_connection() try: vendas = db.query(VendaJornalAvulso).join(Militante).all() militantes = db.query(Militante).all() return render_template('listar_vendas_jornal.html', vendas=vendas, militantes=militantes) finally: db.close() # Rota para criar um novo relatório de cotas @app.route("/relatorios/cotas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def novo_relatorio_cotas(): if request.method == "POST": setor_id = request.form.get("setor_id") comite_id = request.form.get("comite_id") total_cotas = float(request.form.get("total_cotas")) data_relatorio = datetime.strptime(request.form.get("data_relatorio"), "%Y-%m-%d").date() db = get_db_connection() try: relatorio_cotas_mensais = RelatorioCotasMensais( setor_id=setor_id, comite_id=comite_id, total_cotas=total_cotas, data_relatorio=data_relatorio ) db.add(relatorio_cotas_mensais) db.commit() flash('Relatório de cotas cadastrado com sucesso!', 'success') return redirect(url_for('listar_relatorios_cotas')) except Exception as e: db.rollback() print(f"Erro ao cadastrar relatório de cotas: {e}") flash('Erro ao cadastrar relatório de cotas', 'danger') return render_template("novo_relatorio_cotas.html") finally: db.close() db = get_db_connection() try: setores = db.query(Setor).order_by(Setor.nome).all() comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all() return render_template("novo_relatorio_cotas.html", setores=setores, comites=comites) finally: db.close() # Rota para listar relatórios de cotas @app.route("/relatorios/cotas") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_relatorios_cotas(): try: db = get_db_connection() relatorios = db.query(RelatorioCotasMensais).order_by(RelatorioCotasMensais.data_relatorio.desc()).all() return render_template("listar_relatorios_cotas.html", relatorios=relatorios) except Exception as e: print(f"Erro ao listar relatórios de cotas: {e}") flash('Erro ao listar relatórios de cotas', 'danger') return render_template("listar_relatorios_cotas.html", relatorios=[]) finally: db.close() # Rota para criar um novo relatório de vendas @app.route("/relatorios/vendas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def novo_relatorio_vendas(): if request.method == "POST": setor_id = request.form.get("setor_id") comite_id = request.form.get("comite_id") total_vendas = float(request.form.get("total_vendas")) data_relatorio = datetime.strptime(request.form.get("data_relatorio"), "%Y-%m-%d").date() db = get_db_connection() try: relatorio_vendas_materiais = RelatorioVendasMateriais( setor_id=setor_id, comite_id=comite_id, total_vendas=total_vendas, data_relatorio=data_relatorio ) db.add(relatorio_vendas_materiais) db.commit() flash('Relatório de vendas cadastrado com sucesso!', 'success') return redirect(url_for('listar_relatorios_vendas')) except Exception as e: db.rollback() print(f"Erro ao cadastrar relatório de vendas: {e}") flash('Erro ao cadastrar relatório de vendas', 'danger') return render_template("novo_relatorio_vendas.html") finally: db.close() db = get_db_connection() try: setores = db.query(Setor).order_by(Setor.nome).all() comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all() return render_template("novo_relatorio_vendas.html", setores=setores, comites=comites) finally: db.close() # Rota para listar relatórios de vendas @app.route("/relatorios/vendas") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_relatorios_vendas(): try: db = get_db_connection() relatorios = db.query(RelatorioVendasMateriais).order_by(RelatorioVendasMateriais.data_relatorio.desc()).all() return render_template("listar_relatorios_vendas.html", relatorios=relatorios) except Exception as e: print(f"Erro ao listar relatórios de vendas: {e}") flash('Erro ao listar relatórios de vendas', 'danger') return render_template("listar_relatorios_vendas.html", relatorios=[]) finally: db.close() # Rota para editar militante @app.route("/militantes/editar/", methods=["GET", "POST"]) @login_required @session_timeout def editar_militante(id): db = get_db_connection() try: militante = db.query(Militante).get(id) if not militante: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'status': 'error', 'message': 'Militante não encontrado'}), 404 flash('Militante não encontrado', 'danger') return redirect(url_for('listar_militantes')) if request.method == "POST": nome = request.form.get("nome") cpf = request.form.get("cpf") email = request.form.get("email") telefone = request.form.get("telefone") endereco = request.form.get("endereco") filiado = request.form.get("filiado") == "on" # Validar CPF if not validar_cpf(cpf): if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'status': 'error', 'message': 'CPF inválido'}), 400 flash('CPF inválido', 'danger') return render_template('editar_militante.html', militante=militante) # Verificar se já existe outro militante com este CPF militante_existente = db.query(Militante).filter( Militante.cpf == cpf, Militante.id != id ).first() if militante_existente: if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'status': 'error', 'message': 'CPF já cadastrado para outro militante'}), 400 flash('CPF já cadastrado para outro militante', 'danger') return render_template('editar_militante.html', militante=militante) try: militante.nome = nome militante.cpf = cpf militante.email = email militante.telefone = telefone militante.endereco = endereco militante.filiado = filiado db.commit() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'success', 'message': 'Militante atualizado com sucesso', 'militante': { 'id': militante.id, 'nome': militante.nome, 'cpf': militante.cpf, 'email': militante.email, 'telefone': militante.telefone, 'endereco': militante.endereco, 'filiado': militante.filiado } }) flash('Militante atualizado com sucesso!', 'success') return redirect(url_for('listar_militantes')) except Exception as e: db.rollback() print(f"Erro ao atualizar militante: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'status': 'error', 'message': 'Erro ao atualizar militante'}), 500 flash('Erro ao atualizar militante', 'danger') return render_template('editar_militante.html', militante=militante) return render_template('editar_militante.html', militante=militante) except Exception as e: print(f"Erro ao editar militante: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({'status': 'error', 'message': 'Erro ao carregar militante'}), 500 flash('Erro ao carregar militante', 'danger') return redirect(url_for('listar_militantes')) finally: db.close() # Rota para criar um novo usuário @app.route("/usuarios/novo", methods=["GET", "POST"]) @login_required def novo_usuario(): if request.method == "POST": username = request.form.get("username") password = request.form.get("password") email = request.form.get("email") role_id = request.form.get("role_id") setor_id = request.form.get("setor_id") # Verificar se usuário já existe db = get_db_connection() try: if db.query(Usuario).filter_by(username=username).first(): flash('Nome de usuário já existe.', 'danger') return render_template("novo_usuario.html") novo_usuario = Usuario( username=username, email=email, role_id=role_id, setor_id=setor_id ) novo_usuario.set_password(password) novo_usuario.otp_secret = pyotp.random_base32() db.add(novo_usuario) db.commit() flash('Usuário cadastrado com sucesso!', 'success') return redirect(url_for('listar_usuarios')) except Exception as e: db.rollback() print(f"Erro ao cadastrar usuário: {e}") flash('Erro ao cadastrar usuário', 'danger') return render_template("novo_usuario.html") finally: db.close() db = get_db_connection() try: roles = db.query(Role).order_by(Role.nome).all() setores = db.query(Setor).order_by(Setor.nome).all() return render_template("novo_usuario.html", roles=roles, setores=setores) finally: db.close() # Rota para verificar status da sessão @app.route('/check_session') def check_session(): if 'user_id' not in session: return jsonify({'status': 'expired'}) if 'last_activity' in session: last_activity = datetime.fromtimestamp(session['last_activity']) now = datetime.now() if now - last_activity > timedelta(hours=2): # Registrar o logout por timeout try: db = get_db_connection() user = db.query(Usuario).get(session.get('user_id')) if user: user.ultimo_logout = datetime.now() user.motivo_logout = "Timeout de sessão" db.commit() db.close() except Exception as e: print(f"Erro ao registrar logout por timeout: {e}") session.clear() return jsonify({'status': 'expired'}) return jsonify({'status': 'active'}) @app.route("/qr/") def get_qr_code(token): db = get_db_connection() try: user = db.query(Usuario).filter_by(username='admin').first() if not user: flash('Usuário não encontrado', 'error') return redirect(url_for('login')) qr_uri = user.get_otp_uri() return render_template('mostrar_qr_code.html', qr_uri=qr_uri) finally: db.close() # Adicionar nova rota para API de setores @app.route("/api/setores/") @require_login def get_setores(cr_id): setores = db_session.query(Setor).filter_by(cr_id=cr_id).all() return jsonify({ 'setores': [{'id': s.id, 'nome': s.nome} for s in setores] }) @app.route('/celulas//militantes') @require_login def list_militantes_celula(celula_id): @require_instance_access('celula', celula_id) def decorated_function(celula_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(celula_id=celula_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(celula_id) @app.route('/setores//militantes') @require_login def list_militantes_setor(setor_id): @require_instance_access('setor', setor_id) def decorated_function(setor_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(setor_id=setor_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(setor_id) @app.route('/crs//militantes') @require_login def list_militantes_cr(cr_id): @require_instance_access('cr', cr_id) def decorated_function(cr_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(cr_id=cr_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(cr_id) @app.route('/celulas//pagamentos') @require_login def list_pagamentos_celula(celula_id): @require_instance_access('celula', celula_id) def decorated_function(celula_id): db = get_db_connection() try: pagamentos = db.query(Pagamento).filter_by(celula_id=celula_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) finally: db.close() return decorated_function(celula_id) @app.route('/setores//pagamentos') @require_login def list_pagamentos_setor(setor_id): @require_instance_access('setor', setor_id) def decorated_function(setor_id): db = get_db_connection() try: pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) finally: db.close() return decorated_function(setor_id) @app.route('/crs//pagamentos') @require_login def list_pagamentos_cr(cr_id): @require_instance_access('cr', cr_id) def decorated_function(cr_id): db = get_db_connection() try: pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) finally: db.close() return decorated_function(cr_id) @app.route('/celulas//pagamentos/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_CELL_PAYMENT', 'celula_id') def novo_pagamento_celula(celula_id): if request.method == 'POST': db = get_db_connection() try: pagamento = Pagamento( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], celula_id=celula_id ) db.add(pagamento) db.commit() flash('Pagamento registrado com sucesso!', 'success') return redirect(url_for('list_pagamentos_celula', celula_id=celula_id)) finally: db.close() return render_template('pagamentos/form.html') @app.route('/setores//pagamentos/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_SECTOR_PAYMENT', 'setor_id') def novo_pagamento_setor(setor_id): if request.method == 'POST': db = get_db_connection() try: pagamento = Pagamento( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], setor_id=setor_id ) db.add(pagamento) db.commit() flash('Pagamento registrado com sucesso!', 'success') return redirect(url_for('list_pagamentos_setor', setor_id=setor_id)) finally: db.close() return render_template('pagamentos/form.html') @app.route('/crs//pagamentos/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_CR_PAYMENT', 'cr_id') def novo_pagamento_cr(cr_id): if request.method == 'POST': db = get_db_connection() try: pagamento = Pagamento( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], cr_id=cr_id ) db.add(pagamento) db.commit() flash('Pagamento registrado com sucesso!', 'success') return redirect(url_for('list_pagamentos_cr', cr_id=cr_id)) finally: db.close() return render_template('pagamentos/form.html') @app.route("/alterar_senha", methods=["GET", "POST"]) @require_login def alterar_senha(): """Rota para alterar a senha do usuário""" if request.method == "POST": senha_atual = request.form.get("senha_atual") nova_senha = request.form.get("nova_senha") confirmar_senha = request.form.get("confirmar_senha") if not all([senha_atual, nova_senha, confirmar_senha]): flash("Todos os campos são obrigatórios.", "error") return redirect(url_for("alterar_senha")) if nova_senha != confirmar_senha: flash("As senhas não coincidem.", "error") return redirect(url_for("alterar_senha")) db = get_db_connection() try: user = db.query(Usuario).get(current_user.id) if not user.check_password(senha_atual): flash("Senha atual incorreta.", "error") return redirect(url_for("alterar_senha")) user.password_hash = generate_password_hash(nova_senha) db.commit() flash("Senha alterada com sucesso!", "success") return redirect(url_for("home")) finally: db.close() return render_template("alterar_senha.html") @app.route('/usuarios//toggle_status', methods=['POST']) @require_login def toggle_user_status(user_id): user = db_session.query(Usuario).get_or_404(user_id) # Verificar permissões baseado na hierarquia if not current_user.has_permission('system_config'): if current_user.has_permission('manage_cr_sectors'): # Secretário de CR só pode gerenciar membros do seu CR if user.cr_id != current_user.cr_id: flash('Você não tem permissão para gerenciar este usuário.', 'danger') return redirect(url_for('dashboard_admin')) elif current_user.has_permission('manage_sector_cells'): # Secretário de Setor só pode gerenciar membros do seu setor if user.setor_id != current_user.setor_id: flash('Você não tem permissão para gerenciar este usuário.', 'danger') return redirect(url_for('dashboard_admin')) elif current_user.has_permission('manage_cell_members'): # Secretário de Célula só pode gerenciar membros da sua célula if user.celula_id != current_user.celula_id: flash('Você não tem permissão para gerenciar este usuário.', 'danger') return redirect(url_for('dashboard_admin')) else: # Militante básico não pode gerenciar ninguém flash('Você não tem permissão para gerenciar usuários.', 'danger') return redirect(url_for('dashboard_admin')) user.ativo = not user.ativo db_session.commit() return jsonify({'success': True, 'message': 'Status do usuário alterado com sucesso!'}) @app.route('/usuarios//alterar_nivel', methods=['POST']) @require_login def alterar_nivel(user_id): user = db_session.query(Usuario).get_or_404(user_id) novo_nivel = request.json.get('nivel') # Verificar permissões baseado na hierarquia if not current_user.has_permission('system_config'): if current_user.has_permission('manage_cr_sectors'): # Secretário de CR só pode alterar níveis dentro do seu CR if user.cr_id != current_user.cr_id: return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'}) elif current_user.has_permission('manage_sector_cells'): # Secretário de Setor só pode alterar níveis dentro do seu setor if user.setor_id != current_user.setor_id: return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'}) else: # Outros níveis não podem alterar níveis return jsonify({'success': False, 'message': 'Você não tem permissão para alterar níveis de usuários.'}) # Verificar se o novo nível é válido para o nível hierárquico do usuário atual if current_user.has_permission('system_config'): # Secretário Geral e Secretário de Organização podem alterar para qualquer nível pass elif current_user.has_permission('manage_cr_sectors'): # Secretário de CR só pode alterar para níveis do CR if novo_nivel not in ['membro_cr', 'secretario_cr']: return jsonify({'success': False, 'message': 'Nível inválido para este CR.'}) elif current_user.has_permission('manage_sector_cells'): # Secretário de Setor só pode alterar para níveis do setor if novo_nivel not in ['membro_setor', 'secretario_setor']: return jsonify({'success': False, 'message': 'Nível inválido para este setor.'}) # Atualizar o nível do usuário user.role = novo_nivel db_session.commit() return jsonify({'success': True, 'message': 'Nível do usuário alterado com sucesso!'}) @app.route('/usuarios//toggle_quadro_orientador', methods=['POST']) @require_login def toggle_quadro_orientador(user_id): db = get_db_connection() try: user = db.query(Usuario).get(user_id) if not user: return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404 # Verificar permissões if not (current_user.has_permission('system_config') or (current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or (current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)): return jsonify({'success': False, 'message': 'Você não tem permissão para alterar esta responsabilidade'}), 403 # Verificar se o usuário tem um militante associado if not user.militante: return jsonify({'success': False, 'message': 'Usuário não tem um militante associado'}), 400 # Alternar o status de Quadro-Orientador user.militante.quadro_orientador = not user.militante.quadro_orientador # Atualizar a responsabilidade no campo responsabilidades if user.militante.quadro_orientador: user.militante.responsabilidades |= Militante.QUADRO_ORIENTADOR else: user.militante.responsabilidades &= ~Militante.QUADRO_ORIENTADOR db.commit() return jsonify({ 'success': True, 'message': f'Responsabilidade de Quadro-Orientador {"adicionada" if user.militante.quadro_orientador else "removida"} com sucesso' }) except Exception as e: db.rollback() return jsonify({'success': False, 'message': str(e)}), 500 finally: db.close() @app.route("/cotas/excluir/", methods=["POST"]) @login_required @session_timeout def excluir_cota(id): """Exclui uma cota mensal""" db = get_db_connection() try: cota = db.query(CotaMensal).get(id) if not cota: flash('Cota não encontrada.', 'danger') return redirect(url_for('listar_cotas')) # Excluir a cota db.delete(cota) db.commit() flash('Cota excluída com sucesso!', 'success') except Exception as e: db.rollback() flash('Erro ao excluir cota. Por favor, tente novamente.', 'danger') print(f"Erro ao excluir cota: {e}") finally: db.close() return redirect(url_for('listar_cotas')) @app.route("/assinaturas") @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def listar_assinaturas(): db = get_db_connection() try: assinaturas = db.query(AssinaturaAnual).join(Militante).all() militantes = db.query(Militante).all() return render_template('listar_assinaturas.html', assinaturas=assinaturas, militantes=militantes) finally: db.close() @app.route("/assinaturas/novo", methods=['POST']) @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def nova_assinatura(): db = get_db_connection() try: data = request.form # Validar dados if not all(k in data for k in ['militante_id', 'data_inicio', 'data_fim', 'valor']): return jsonify({'success': False, 'message': 'Todos os campos são obrigatórios'}) # Criar nova assinatura assinatura = AssinaturaAnual( militante_id=data['militante_id'], data_inicio=datetime.strptime(data['data_inicio'], '%Y-%m-%d'), data_fim=datetime.strptime(data['data_fim'], '%Y-%m-%d'), valor=float(data['valor']) ) db.add(assinatura) db.commit() return jsonify({'success': True}) except Exception as e: db.rollback() return jsonify({'success': False, 'message': str(e)}) finally: db.close() @app.route("/assinaturas/excluir/", methods=['POST']) @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def excluir_assinatura(id): db = get_db_connection() try: assinatura = db.query(AssinaturaAnual).get(id) if not assinatura: return jsonify({'success': False, 'message': 'Assinatura não encontrada'}) db.delete(assinatura) db.commit() return jsonify({'success': True}) except Exception as e: db.rollback() return jsonify({'success': False, 'message': str(e)}) finally: db.close() def create_app(): app = Flask(__name__) # ... existing code ... # ... existing code ... return app def init_system(): """Inicializa o sistema com todos os usuários necessários""" print("Inicializando sistema...") # Inicializar banco de dados print("Inicializando banco de dados...") init_database() # Criar admin create_admin_user() # Criar usuários de teste create_test_users() # Verificar configuração db = get_db_connection() try: # Verificar admin admin = db.query(Usuario).filter_by(username='admin').first() if admin: print("\nAdmin configurado:") print(f"Username: admin") print(f"Senha: admin123") if os.path.exists('admin_qr.png'): print("OTP: Usando configuração existente do arquivo admin_qr.png") else: print("OTP: Nova configuração gerada") # Verificar usuários de teste test_users = ['aligner', 'tester', 'deployer'] for username in test_users: user = db.query(Usuario).filter_by(username=username).first() if user: print(f"\nUsuário {username} configurado:") print(f"Username: {username}") print(f"Senha: Test123!@#") if user.otp_secret: print("OTP: Configurado") else: print("OTP: Não configurado") finally: db.close() print("\nInstruções:") print("1. Configure o OTP usando o QR code gerado") print("2. Faça login com as credenciais fornecidas") print("3. Altere a senha no primeiro login") print("\nCredenciais:") print("Admin:") print("Username: admin") print("Senha: admin123") print("\nUsuários de teste:") print("Username: aligner, tester, deployer") print("Senha: Test123!@#") if __name__ == '__main__': init_system() app.run(debug=True)