from flask import Flask, request, render_template, redirect, url_for, flash, session, jsonify, send_file from functions.database import ( Base, Militante, CotaMensal, TipoPagamento, Pagamento, MaterialVendido, TipoMaterial, VendaJornalAvulso, engine, AssinaturaAnual, RelatorioCotasMensais, RelatorioVendasMateriais, Usuario, get_db_connection, ComiteRegional, Setor, Celula, ComiteCentral, EmailMilitante, init_database, EstadoMilitante, Endereco, TipoComprovante, Comprovante, CampanhaFinanceira, TransacaoPIX, Permission, Role, Atividade, MaterialAtividade, Relatorio, CentralizacaoComprovante ) from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, joinedload from datetime import datetime, timedelta, date from flask_bootstrap import Bootstrap5 from functions.validations import validar_cpf from functools import wraps from pathlib import Path from time import time from flask_mail import Mail, Message import os from dotenv import load_dotenv from functions.rbac import init_rbac from functions.decorators import require_permission, require_role, require_minimum_role, require_login, require_instance_permission, require_instance_access import re import secrets import pyotp import qrcode import base64 from io import BytesIO from create_admin import create_admin_user from werkzeug.security import generate_password_hash, check_password_hash from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user import random import string from sqlalchemy.sql import func from flask_wtf.csrf import CSRFProtect import json from utils.date_utils import validar_data, converter_data, validar_sequencia_datas, calcular_idade load_dotenv() def create_app(): app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY', secrets.token_hex(16)) bootstrap = Bootstrap5(app) # Configurar CSRF Protection csrf = CSRFProtect() csrf.init_app(app) # Configurar Flask-Login login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' # Adicionar filtros Jinja2 @app.template_filter('bitwise_and') def bitwise_and(value1, value2): """Filtro para operação bit a bit AND""" return value1 & value2 @app.before_request def before_request(): """Configurações antes de cada requisição""" session.permanent = True app.permanent_session_lifetime = timedelta(minutes=30) session.modified = True @login_manager.user_loader def load_user(user_id): """Carrega o usuário pelo ID""" db = get_db_connection() try: # Carregar o usuário com suas roles user = db.query(Usuario).options( joinedload(Usuario.roles) ).get(user_id) return user finally: db.close() def generate_qr_code(user): """Gera um QR code para o usuário""" if not user.otp_secret: user.otp_secret = pyotp.random_base32() totp = pyotp.TOTP(user.otp_secret) qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles")) qr.make(fit=True) img = qr.make_image(fill_color="black", back_color="white") buffer = BytesIO() img.save(buffer, format="PNG") qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8') return qr_code # Configurar Flask-Mail app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER', 'smtp.gmail.com') app.config['MAIL_PORT'] = int(os.getenv('MAIL_PORT', 587)) app.config['MAIL_USE_TLS'] = os.getenv('MAIL_USE_TLS', 'True').lower() == 'true' app.config['MAIL_USERNAME'] = os.getenv('MAIL_USERNAME') app.config['MAIL_PASSWORD'] = os.getenv('MAIL_PASSWORD') app.config['MAIL_DEFAULT_SENDER'] = os.getenv('MAIL_DEFAULT_SENDER') mail = Mail(app) # Configuração da sessão do SQLAlchemy db_session = get_db_connection() # Decorator para verificar se o usuário está logado def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if 'user_id' not in session: flash('Por favor, faça login para acessar esta página.', 'warning') return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # Decorator para verificar se a sessão expirou def session_timeout(f): @wraps(f) def decorated_function(*args, **kwargs): if current_user.is_authenticated: if 'last_activity' not in session: session['last_activity'] = time() return f(*args, **kwargs) last_activity = datetime.fromtimestamp(session['last_activity']) now = datetime.now() # Se passaram mais de 30 minutos (configurável) timeout_minutes = 30 if now - last_activity > timedelta(minutes=timeout_minutes): # Registrar o logout por timeout try: current_user.ultimo_logout = datetime.now() current_user.motivo_logout = "Timeout de sessão" db_session.commit() except Exception as e: print(f"Erro ao registrar logout por timeout: {e}") session.clear() flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning') return redirect(url_for('login')) # Atualizar timestamp de último acesso session['last_activity'] = time() # Atualizar também no banco de dados try: current_user.update_last_activity() db_session.commit() except Exception as e: print(f"Erro ao atualizar última atividade: {e}") return f(*args, **kwargs) return f(*args, **kwargs) return decorated_function # Rota raiz - redireciona para login se não estiver autenticado @app.route("/") @require_login def index(): """Rota principal""" return redirect(url_for('home')) # Rota de login @app.route("/login", methods=["GET", "POST"]) def login(): """Rota de login""" if request.method == "POST": email_or_username = request.form.get("email") password = request.form.get("password") otp = request.form.get("otp") if not all([email_or_username, password]): flash("Email/usuário e senha são obrigatórios.", "danger") return redirect(url_for("login")) db = get_db_connection() try: # Tenta encontrar o usuário por email ou username user = db.query(Usuario).filter( (Usuario.email == email_or_username) | (Usuario.username == email_or_username) ).first() if not user or not user.check_password(password): flash("Email/usuário ou senha incorretos.", "danger") return redirect(url_for("login")) # Verificar OTP apenas se o usuário tiver configurado if user.otp_secret: if not otp: flash("Código OTP é obrigatório para sua conta.", "danger") return redirect(url_for("login")) if not user.verify_otp(otp): flash("Código OTP inválido.", "danger") return redirect(url_for("login")) # Atualizar último login user.ultimo_login = datetime.utcnow() db.commit() # Fazer login e setar sessão login_user(user) session['user_id'] = user.id session['username'] = user.username session['is_admin'] = user.is_admin # Redirecionar para home return redirect(url_for("home")) finally: db.close() return render_template("login.html") # Rota de logout @app.route("/logout") @login_required def logout(): db = get_db_connection() try: user = current_user if user: user.logout() db.commit() logout_user() finally: db.close() flash('Logout realizado com sucesso!', 'success') return redirect(url_for('login')) # Rota home (protegida) @app.route("/home") @require_login def home(): """Página inicial do sistema com dashboard""" db = get_db_connection() try: # Buscar nome do usuário usuario = db.query(Usuario).get(session.get('user_id')) nome_usuario = usuario.username if usuario else "Usuário" # Formatar data atual em português data_atual = datetime.now().strftime("%d de %B de %Y") # Buscar dados para o dashboard total_militantes = db.query(Militante).count() total_cotas = db.query(func.sum(CotaMensal.valor_novo)).scalar() or 0 total_materiais = db.query(MaterialVendido).count() total_assinaturas = db.query(AssinaturaAnual).count() # Buscar últimos militantes cadastrados ultimos_militantes = db.query(Militante)\ .order_by(Militante.id.desc())\ .limit(5)\ .all() # Buscar últimos pagamentos ultimos_pagamentos = db.query(Pagamento)\ .order_by(Pagamento.data_pagamento.desc())\ .limit(5)\ .all() return render_template('home.html', nome_usuario=nome_usuario, data_atual=data_atual, total_militantes=total_militantes, total_cotas=total_cotas, total_materiais=total_materiais, total_assinaturas=total_assinaturas, ultimos_militantes=ultimos_militantes, ultimos_pagamentos=ultimos_pagamentos, Militante=Militante) except Exception as e: print(f"Erro na página inicial: {e}") import traceback traceback.print_exc() flash('Erro ao carregar a página inicial', 'danger') return render_template('home.html', nome_usuario="Usuário", data_atual=datetime.now().strftime("%d/%m/%Y"), total_militantes=0, total_cotas="0.00", total_materiais=0, total_assinaturas=0, ultimos_militantes=[], ultimos_pagamentos=[], Militante=Militante) finally: db.close() # Rota para criar um novo militante @app.route("/militantes/criar", methods=["POST"]) @require_login @require_permission('gerenciar_militantes') def criar_militante(): """Cria um novo militante""" db = get_db_connection() try: # Validar CPF cpf = request.form.get('cpf') if not validar_cpf(cpf): return jsonify({ 'status': 'error', 'message': 'CPF inválido' }), 400 # Verificar se já existe militante com este CPF militante_existente = db.query(Militante).filter(Militante.cpf == cpf).first() if militante_existente: return jsonify({ 'status': 'error', 'message': 'CPF já cadastrado' }), 400 # Criar endereço endereco = Endereco( cep=request.form.get('cep'), estado=request.form.get('estado'), cidade=request.form.get('cidade'), bairro=request.form.get('bairro'), logradouro=request.form.get('logradouro'), numero=request.form.get('numero'), complemento=request.form.get('complemento') ) db.add(endereco) db.flush() # Gerar ID do endereço # Criar militante militante = Militante( # Dados Básicos nome=request.form.get('nome'), cpf=cpf, titulo_eleitoral=request.form.get('titulo_eleitoral'), data_nascimento=datetime.strptime(request.form.get('data_nascimento'), '%Y-%m-%d') if request.form.get('data_nascimento') else None, data_entrada_oci=datetime.strptime(request.form.get('data_entrada_oci'), '%Y-%m-%d') if request.form.get('data_entrada_oci') else None, data_efetivacao_oci=datetime.strptime(request.form.get('data_efetivacao_oci'), '%Y-%m-%d') if request.form.get('data_efetivacao_oci') else None, # Contato telefone1=request.form.get('telefone1'), telefone2=request.form.get('telefone2'), endereco_id=endereco.id, # Profissional profissao=request.form.get('profissao'), regime_trabalho=request.form.get('regime_trabalho'), empresa=request.form.get('empresa'), contratante=request.form.get('contratante'), # Acadêmico instituicao_ensino=request.form.get('instituicao_ensino'), tipo_instituicao=request.form.get('tipo_instituicao'), # Sindical sindicato=request.form.get('sindicato'), cargo_sindical=request.form.get('cargo_sindical'), central_sindical=request.form.get('central_sindical'), dirigente_sindical=request.form.get('dirigente_sindical') == 'on', # Organização estado=EstadoMilitante(request.form.get('estado', 'ATIVO')), celula_id=request.form.get('celula_id', type=int), responsabilidades=request.form.get('responsabilidades', type=int, default=0), # Por padrão, todo novo militante é aspirante aspirante=True, data_inicio_aspirante=datetime.now() ) db.add(militante) db.flush() # Gerar ID do militante # Criar email principal email = request.form.get('email') if email: if militante.emails: militante.emails[0].endereco_email = email else: novo_email = EmailMilitante( endereco_email=email, militante_id=militante.id ) db.add(novo_email) militante.emails.append(novo_email) db.commit() return jsonify({ 'status': 'success', 'message': 'Militante criado com sucesso!' }) except Exception as e: db.rollback() return jsonify({ 'status': 'error', 'message': f'Erro ao criar militante: {str(e)}' }), 500 finally: db.close() # Rota para listar militantes @app.route("/militantes") @require_login @require_permission(Permission.MANAGE_CELL_MEMBERS) def listar_militantes(): db = get_db_connection() try: militantes = db.query(Militante).order_by(Militante.nome).all() celulas = db.query(Celula).order_by(Celula.nome).all() return render_template('listar_militantes.html', militantes=militantes, Militante=Militante, celulas=celulas) finally: db.close() # Rota para excluir militante @app.route("/militantes/excluir/", methods=["POST"]) @login_required @session_timeout def excluir_militante(id): try: db = get_db_connection() militante = db.query(Militante).get(id) if not militante: flash('Militante não encontrado', 'danger') return redirect(url_for('listar_militantes')) db.delete(militante) db.commit() flash('Militante excluído com sucesso!', 'success') except Exception as e: db.rollback() print(f"Erro ao excluir militante: {e}") flash('Erro ao excluir militante', 'danger') finally: db.close() return redirect(url_for('listar_militantes')) # Rota para criar uma nova cota @app.route("/cotas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def nova_cota(): if request.method == "POST": militante_id = request.form.get("militante_id") valor_antigo = float(request.form.get("valor_antigo")) valor_novo = float(request.form.get("valor_novo")) data_alteracao = datetime.strptime(request.form.get("data_alteracao"), "%Y-%m-%d").date() data_vencimento = datetime.strptime(request.form.get("data_vencimento"), "%Y-%m-%d").date() db = get_db_connection() try: cotas_mensais = CotaMensal( militante_id=militante_id, valor_antigo=valor_antigo, valor_novo=valor_novo, data_alteracao=data_alteracao, data_vencimento=data_vencimento, pago=False ) db.add(cotas_mensais) db.commit() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'success', 'message': 'Cota cadastrada com sucesso!' }) flash('Cota cadastrada com sucesso!', 'success') return redirect(url_for('listar_cotas')) except Exception as e: db.rollback() print(f"Erro ao cadastrar cota: {e}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'error', 'message': 'Erro ao cadastrar cota. Verifique os dados e tente novamente.' }), 400 flash('Erro ao cadastrar cota', 'danger') return render_template("nova_cota.html") finally: db.close() db = get_db_connection() try: militantes = db.query(Militante).order_by(Militante.nome).all() return render_template("nova_cota.html", militantes=militantes) finally: db.close() # Rota para listar cotas @app.route("/cotas") @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def listar_cotas(): try: print("Buscando cotas...") db = get_db_connection() cotas = db.query(CotaMensal)\ .join(Militante)\ .order_by(CotaMensal.data_vencimento.desc())\ .all() # Calcular status de cada cota for cota in cotas: if cota.pago: cota.status = "paga" elif cota.data_vencimento < datetime.now().date(): cota.status = "atrasada" else: cota.status = "pendente" # Buscar militantes para o modal de nova cota militantes = db.query(Militante).order_by(Militante.nome).all() return render_template("listar_cotas.html", cotas=cotas, militantes=militantes) except Exception as e: print(f"Erro ao listar cotas: {e}") flash('Erro ao listar cotas', 'danger') return render_template("listar_cotas.html", cotas=[], militantes=[]) finally: db.close() # Rota para editar cota @app.route('/cotas/editar/', methods=['GET', 'POST']) @login_required @session_timeout def editar_cota(id): db = get_db_connection() try: cota = db.query(CotaMensal).get_or_404(id) if request.method == 'POST': try: print("Dados recebidos:", request.form) cota.militante_id = int(request.form['militante_id']) cota.valor_antigo = float(request.form['valor_antigo']) cota.valor_novo = float(request.form['valor_novo']) cota.data_alteracao = datetime.strptime(request.form['data_alteracao'], '%Y-%m-%d').date() cota.data_vencimento = datetime.strptime(request.form['data_vencimento'], '%Y-%m-%d').date() # Processar o campo pago pago = request.form.get('pago', '').lower() print("Valor do campo pago recebido:", pago) cota.pago = pago == 'true' print("Status final do pago:", cota.pago) db.commit() print("Commit realizado com sucesso") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'success', 'message': 'Cota atualizada com sucesso!' }) flash('Cota atualizada com sucesso!', 'success') return redirect(url_for('listar_cotas')) except Exception as e: db.rollback() print(f"Erro ao atualizar cota: {str(e)}") print(f"Tipo do erro: {type(e)}") import traceback traceback.print_exc() if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'error', 'message': f'Erro ao atualizar cota: {str(e)}' }), 400 flash('Erro ao atualizar cota. Verifique os dados e tente novamente.', 'danger') return redirect(url_for('editar_cota', id=id)) return render_template('editar_cota.html', cota=cota) except Exception as e: print(f"Erro ao carregar cota: {str(e)}") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': return jsonify({ 'status': 'error', 'message': f'Erro ao carregar cota: {str(e)}' }), 404 flash('Erro ao carregar cota', 'danger') return redirect(url_for('listar_cotas')) finally: db.close() # Rota para criar um novo pagamento @app.route("/pagamentos/novo", methods=["GET", "POST"]) @require_login def novo_pagamento(): if request.method == "POST": try: militante_id = request.form.get("militante_id") tipo_pagamento_id = request.form.get("tipo_pagamento_id") valor = float(request.form.get("valor")) data_pagamento = converter_data(request.form.get("data_pagamento")) if not validar_data(data_pagamento): flash('Data de pagamento inválida ou futura', 'danger') return redirect(url_for('novo_pagamento')) db = get_db_connection() pagamento = Pagamento( militante_id=militante_id, tipo_pagamento_id=tipo_pagamento_id, valor=valor, data_pagamento=data_pagamento ) db.add(pagamento) db.commit() flash('Pagamento cadastrado com sucesso!', 'success') return redirect(url_for('listar_pagamentos')) except Exception as e: db.rollback() app.logger.error(f"Erro ao cadastrar pagamento: {e}") flash('Erro ao cadastrar pagamento', 'danger') return redirect(url_for('novo_pagamento')) finally: db.close() # GET - Renderizar formulário db = get_db_connection() try: militantes = db.query(Militante).order_by(Militante.nome).all() tipos_pagamento = db.query(TipoPagamento).order_by(TipoPagamento.descricao).all() return render_template("novo_pagamento.html", militantes=militantes, tipos_pagamento=tipos_pagamento) finally: db.close() # Rota para listar pagamentos @app.route("/pagamentos") @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def listar_pagamentos(): try: print("Buscando pagamentos...") db = get_db_connection() pagamentos = db.query(Pagamento)\ .join(Militante)\ .order_by(Pagamento.data_pagamento.desc())\ .all() militantes = db.query(Militante).order_by(Militante.nome).all() tipos_pagamento = db.query(TipoPagamento).all() return render_template("listar_pagamentos.html", pagamentos=pagamentos, militantes=militantes, tipos_pagamento=tipos_pagamento) except Exception as e: print(f"Erro ao listar pagamentos: {e}") flash('Erro ao listar pagamentos', 'danger') return render_template("listar_pagamentos.html", pagamentos=[], militantes=[]) finally: db.close() # Rota para adicionar pagamento @app.route("/pagamentos/adicionar", methods=["POST"]) @require_login @require_permission(Permission.MANAGE_CELL_REPORTS) def adicionar_pagamento(): if request.method == "POST": try: militante_id = request.form.get("militante_id") tipo_pagamento = request.form.get("tipo_pagamento") valor = float(request.form.get("valor")) data_pagamento = converter_data(request.form.get("data_pagamento")) db = get_db_connection() pagamento = Pagamento( militante_id=militante_id, tipo_pagamento=tipo_pagamento, valor=valor, data_pagamento=data_pagamento ) db.add(pagamento) db.commit() flash('Pagamento adicionado com sucesso!', 'success') except Exception as e: db.rollback() flash(f'Erro ao adicionar pagamento: {str(e)}', 'danger') finally: db.close() return redirect(url_for('listar_pagamentos')) # Rota para criar um novo material vendido @app.route("/materiais/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_MATERIALS) def novo_material(): if request.method == "POST": try: nome = request.form.get("nome") descricao = request.form.get("descricao") preco = float(request.form.get("preco")) quantidade = int(request.form.get("quantidade")) if not all([nome, descricao, preco, quantidade]): flash("Todos os campos são obrigatórios", "danger") return redirect(url_for("novo_material")) db = get_db_connection() material = MaterialVendido(nome=nome, valor=valor) db.add(material) db.commit() flash("Material cadastrado com sucesso", "success") return redirect(url_for("listar_materiais")) except Exception as e: flash(f"Erro ao cadastrar material: {str(e)}", "danger") return redirect(url_for("novo_material")) return render_template("novo_material.html") @app.route("/materiais") @require_login @require_permission(Permission.MANAGE_MATERIALS) def listar_materiais(): db = get_db_connection() materiais = db.query(MaterialVendido).all() return render_template("listar_materiais.html", materiais=materiais) @app.route("/materiais/vendidos/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_MATERIALS) def novo_material_vendido(): if request.method == "POST": try: militante_id = request.form.get("militante_id") material_id = request.form.get("material_id") quantidade = request.form.get("quantidade") data_venda = request.form.get("data_venda") if not all([militante_id, material_id, quantidade, data_venda]): flash("Todos os campos são obrigatórios", "danger") return redirect(url_for("novo_material_vendido")) db = get_db_connection() material_vendido = MaterialVendido( militante_id=militante_id, material_id=material_id, quantidade=quantidade, data_venda=data_venda ) db.add(material_vendido) db.commit() flash("Material vendido registrado com sucesso", "success") return redirect(url_for("listar_materiais_vendidos")) except Exception as e: flash(f"Erro ao registrar material vendido: {str(e)}", "danger") return redirect(url_for("novo_material_vendido")) db = get_db_connection() militantes = db.query(Militante).all() materiais = db.query(MaterialVendido).all() return render_template("novo_material_vendido.html", militantes=militantes, materiais=materiais) @app.route("/materiais/vendidos") @require_login @require_permission(Permission.MANAGE_MATERIALS) def listar_materiais_vendidos(): db = get_db_connection() materiais_vendidos = db.query(MaterialVendido).all() return render_template("listar_materiais_vendidos.html", materiais_vendidos=materiais_vendidos) # Rota para criar uma nova venda de jornal @app.route("/vendas/jornal/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_MATERIALS) def nova_venda_jornal(): if request.method == "POST": try: militante_id = request.form.get("militante_id") quantidade = request.form.get("quantidade") data_venda = request.form.get("data_venda") if not all([militante_id, quantidade, data_venda]): flash("Todos os campos são obrigatórios", "danger") return redirect(url_for("nova_venda_jornal")) db = get_db_connection() venda_jornal = VendaJornalAvulso( militante_id=militante_id, quantidade=quantidade, data_venda=data_venda ) db.add(venda_jornal) db.commit() flash("Venda de jornal registrada com sucesso", "success") return redirect(url_for("listar_vendas_jornal")) except Exception as e: flash(f"Erro ao registrar venda de jornal: {str(e)}", "danger") return redirect(url_for("nova_venda_jornal")) db = get_db_connection() militantes = db.query(Militante).all() return render_template("nova_venda_jornal.html", militantes=militantes) @app.route("/vendas/jornal") @require_login @require_permission(Permission.MANAGE_MATERIALS) def listar_vendas_jornal(): db = get_db_connection() vendas_jornal = db.query(VendaJornalAvulso).all() return render_template("listar_vendas_jornal.html", vendas_jornal=vendas_jornal) # Rota para criar um novo relatório de cotas @app.route("/relatorios/cotas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_REPORTS) def novo_relatorio_cotas(): if request.method == "POST": try: data_inicio = request.form.get("data_inicio") data_fim = request.form.get("data_fim") if not all([data_inicio, data_fim]): flash("Todos os campos são obrigatórios", "danger") return redirect(url_for("novo_relatorio_cotas")) db = get_db_connection() cotas = db.query(CotaMensal).filter( CotaMensal.data_cota.between(data_inicio, data_fim) ).all() return render_template("relatorio_cotas.html", cotas=cotas, data_inicio=data_inicio, data_fim=data_fim) except Exception as e: flash(f"Erro ao gerar relatório: {str(e)}", "danger") return redirect(url_for("novo_relatorio_cotas")) return render_template("novo_relatorio_cotas.html") @app.route("/relatorios/jornais/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_REPORTS) def novo_relatorio_jornais(): if request.method == "POST": try: data_inicio = request.form.get("data_inicio") data_fim = request.form.get("data_fim") if not all([data_inicio, data_fim]): flash("Todos os campos são obrigatórios", "danger") return redirect(url_for("novo_relatorio_jornais")) db = get_db_connection() jornais = db.query(VendaJornalAvulso).filter( VendaJornalAvulso.data_venda.between(data_inicio, data_fim) ).all() return render_template("relatorio_jornais.html", jornais=jornais, data_inicio=data_inicio, data_fim=data_fim) except Exception as e: flash(f"Erro ao gerar relatório: {str(e)}", "danger") return redirect(url_for("novo_relatorio_jornais")) return render_template("novo_relatorio_jornais.html") @app.route("/relatorios/assinaturas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_REPORTS) def novo_relatorio_assinaturas(): if request.method == "POST": try: data_inicio = request.form.get("data_inicio") data_fim = request.form.get("data_fim") if not all([data_inicio, data_fim]): flash("Todos os campos são obrigatórios", "danger") return redirect(url_for("novo_relatorio_assinaturas")) db = get_db_connection() assinaturas = db.query(AssinaturaAnual).filter( AssinaturaAnual.data_assinatura.between(data_inicio, data_fim) ).all() return render_template("relatorio_assinaturas.html", assinaturas=assinaturas, data_inicio=data_inicio, data_fim=data_fim) except Exception as e: flash(f"Erro ao gerar relatório: {str(e)}", "danger") return redirect(url_for("novo_relatorio_assinaturas")) return render_template("novo_relatorio_assinaturas.html") @app.route("/relatorios/campanhas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.MANAGE_REPORTS) def novo_relatorio_campanhas(): if request.method == "POST": try: data_inicio = request.form.get("data_inicio") data_fim = request.form.get("data_fim") if not all([data_inicio, data_fim]): flash("Todos os campos são obrigatórios", "danger") return redirect(url_for("novo_relatorio_campanhas")) db = get_db_connection() campanhas = db.query(CampanhaFinanceira).filter( CampanhaFinanceira.data_campanha.between(data_inicio, data_fim) ).all() return render_template("relatorio_campanhas.html", campanhas=campanhas, data_inicio=data_inicio, data_fim=data_fim) except Exception as e: flash(f"Erro ao gerar relatório: {str(e)}", "danger") return redirect(url_for("novo_relatorio_campanhas")) return render_template("novo_relatorio_campanhas.html") # Rota para listar relatórios de cotas @app.route("/relatorios/cotas") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_relatorios_cotas(): try: db = get_db_connection() relatorios = db.query(RelatorioCotasMensais).order_by(RelatorioCotasMensais.data_relatorio.desc()).all() return render_template("listar_relatorios_cotas.html", relatorios=relatorios) except Exception as e: print(f"Erro ao listar relatórios de cotas: {e}") flash('Erro ao listar relatórios de cotas', 'danger') return render_template("listar_relatorios_cotas.html", relatorios=[]) finally: db.close() # Rota para criar um novo relatório de vendas @app.route("/relatorios/vendas/novo", methods=["GET", "POST"]) @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def novo_relatorio_vendas(): if request.method == "POST": try: setor_id = request.form.get("setor_id") comite_id = request.form.get("comite_id") total_vendas = float(request.form.get("total_vendas")) data_relatorio = request.form.get("data_relatorio") # Validar data if not validar_data(data_relatorio): flash('Data do relatório inválida', 'danger') return render_template("novo_relatorio_vendas.html") # Converter data data_relatorio = converter_data(data_relatorio) # Validar data futura if data_relatorio > date.today(): flash('A data do relatório não pode ser futura', 'danger') return render_template("novo_relatorio_vendas.html") db = get_db_connection() try: relatorio_vendas_materiais = RelatorioVendasMateriais( setor_id=setor_id, comite_id=comite_id, total_vendas=total_vendas, data_relatorio=data_relatorio ) db.add(relatorio_vendas_materiais) db.commit() flash('Relatório de vendas cadastrado com sucesso!', 'success') return redirect(url_for('listar_relatorios_vendas')) except Exception as e: db.rollback() app.logger.error(f"Erro ao cadastrar relatório de vendas: {e}") flash('Erro ao cadastrar relatório de vendas', 'danger') return render_template("novo_relatorio_vendas.html") finally: db.close() except ValueError as e: flash(str(e), 'danger') return render_template("novo_relatorio_vendas.html") db = get_db_connection() try: setores = db.query(Setor).order_by(Setor.nome).all() comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all() return render_template("novo_relatorio_vendas.html", setores=setores, comites=comites, hoje=date.today().strftime('%Y-%m-%d')) finally: db.close() # Rota para listar relatórios de vendas @app.route("/relatorios/vendas") @require_login @require_permission(Permission.VIEW_CELL_REPORTS) def listar_relatorios_vendas(): try: db = get_db_connection() relatorios = db.query(RelatorioVendasMateriais).order_by(RelatorioVendasMateriais.data_relatorio.desc()).all() return render_template("listar_relatorios_vendas.html", relatorios=relatorios) except Exception as e: print(f"Erro ao listar relatórios de vendas: {e}") flash('Erro ao listar relatórios de vendas', 'danger') return render_template("listar_relatorios_vendas.html", relatorios=[]) finally: db.close() # Rota para editar militante @app.route('/militantes/editar/', methods=['POST']) @login_required @require_permission('gerenciar_militantes') def editar_militante(militante_id): try: db = get_db_connection() militante = db.query(Militante).get(militante_id) if not militante: return jsonify({ 'status': 'error', 'message': 'Militante não encontrado' }), 404 # Obter dados do formulário nome = request.form.get('nome') cpf = request.form.get('cpf') titulo_eleitoral = request.form.get('titulo_eleitoral') data_nascimento = request.form.get('data_nascimento') data_entrada_oci = request.form.get('data_entrada_oci') data_efetivacao_oci = request.form.get('data_efetivacao_oci') telefone1 = request.form.get('telefone1') telefone2 = request.form.get('telefone2') email = request.form.get('email') # Validar e converter datas try: data_nascimento = converter_data(data_nascimento) if data_nascimento else None data_entrada_oci = converter_data(data_entrada_oci) if data_entrada_oci else None data_efetivacao_oci = converter_data(data_efetivacao_oci) if data_efetivacao_oci else None # Validar sequência lógica das datas validar_sequencia_datas( data_nascimento=data_nascimento, data_entrada=data_entrada_oci, data_efetivacao=data_efetivacao_oci ) except ValueError as e: return jsonify({ 'status': 'error', 'message': str(e) }), 400 # Atualizar dados básicos if nome: militante.nome = nome if cpf: militante.cpf = cpf if titulo_eleitoral: militante.titulo_eleitoral = titulo_eleitoral militante.data_nascimento = data_nascimento militante.data_entrada_oci = data_entrada_oci militante.data_efetivacao_oci = data_efetivacao_oci militante.telefone1 = telefone1 militante.telefone2 = telefone2 # Atualizar email if email: # Verificar se já existe email email_existente = db.query(EmailMilitante).filter_by(militante_id=militante_id).first() if email_existente: email_existente.endereco_email = email else: novo_email = EmailMilitante(endereco_email=email, militante_id=militante_id) db.add(novo_email) # Calcular idade militante.idade = calcular_idade(data_nascimento) if data_nascimento else None try: db.commit() return jsonify({ 'status': 'success', 'message': 'Militante atualizado com sucesso', 'data': { 'nome': militante.nome, 'cpf': militante.cpf, 'idade': militante.idade, 'emails': [e.endereco_email for e in militante.emails], 'telefone1': militante.telefone1, 'celula_id': str(militante.celula_id) if militante.celula_id else None, 'responsabilidades_valor': militante.responsabilidades } }) except Exception as e: db.rollback() app.logger.error(f"Erro ao salvar militante: {e}") return jsonify({ 'status': 'error', 'message': 'Erro ao salvar alterações no banco de dados' }), 500 except Exception as e: app.logger.error(f"Erro ao editar militante: {e}") return jsonify({ 'status': 'error', 'message': 'Erro interno do servidor' }), 500 # Rota para criar um novo usuário @app.route("/usuarios/novo", methods=["GET", "POST"]) @login_required def novo_usuario(): if request.method == "POST": username = request.form.get("username") password = request.form.get("password") email = request.form.get("email") role_id = request.form.get("role_id") setor_id = request.form.get("setor_id") # Verificar se usuário já existe db = get_db_connection() try: if db.query(Usuario).filter_by(username=username).first(): flash('Nome de usuário já existe.', 'danger') return render_template("novo_usuario.html") novo_usuario = Usuario( username=username, email=email, role_id=role_id, setor_id=setor_id ) novo_usuario.set_password(password) novo_usuario.otp_secret = pyotp.random_base32() db.add(novo_usuario) db.commit() flash('Usuário cadastrado com sucesso!', 'success') return redirect(url_for('listar_usuarios')) except Exception as e: db.rollback() print(f"Erro ao cadastrar usuário: {e}") flash('Erro ao cadastrar usuário', 'danger') return render_template("novo_usuario.html") finally: db.close() db = get_db_connection() try: roles = db.query(Role).order_by(Role.nome).all() setores = db.query(Setor).order_by(Setor.nome).all() return render_template("novo_usuario.html", roles=roles, setores=setores) finally: db.close() # Rota para verificar status da sessão @app.route('/check_session') def check_session(): if 'user_id' not in session: return jsonify({'status': 'expired'}) if 'last_activity' in session: last_activity = datetime.fromtimestamp(session['last_activity']) now = datetime.now() if now - last_activity > timedelta(hours=2): # Registrar o logout por timeout try: db = get_db_connection() user = db.query(Usuario).get(session.get('user_id')) if user: user.ultimo_logout = datetime.now() user.motivo_logout = "Timeout de sessão" db.commit() db.close() except Exception as e: print(f"Erro ao registrar logout por timeout: {e}") session.clear() return jsonify({'status': 'expired'}) return jsonify({'status': 'active'}) @app.route("/qr/") def get_qr_code(token): db = get_db_connection() try: user = db.query(Usuario).filter_by(username='admin').first() if not user: flash('Usuário não encontrado', 'error') return redirect(url_for('login')) qr_uri = user.get_otp_uri() return render_template('mostrar_qr_code.html', qr_uri=qr_uri) finally: db.close() # Adicionar nova rota para API de setores @app.route("/api/setores/") @require_login def get_setores(cr_id): setores = db_session.query(Setor).filter_by(cr_id=cr_id).all() return jsonify({ 'setores': [{'id': s.id, 'nome': s.nome} for s in setores] }) @app.route('/celulas//militantes') @require_login def list_militantes_celula(celula_id): @require_instance_access('celula', celula_id) def decorated_function(celula_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(celula_id=celula_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(celula_id) @app.route('/setores//militantes') @require_login def list_militantes_setor(setor_id): @require_instance_access('setor', setor_id) def decorated_function(setor_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(setor_id=setor_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(setor_id) @app.route('/crs//militantes') @require_login def list_militantes_cr(cr_id): @require_instance_access('cr', cr_id) def decorated_function(cr_id): db = get_db_connection() try: militantes = db.query(Usuario).filter_by(cr_id=cr_id).all() return render_template('militantes/list.html', militantes=militantes) finally: db.close() return decorated_function(cr_id) @app.route('/celulas//comprovantes') @require_login def list_comprovantes_celula(celula_id): @require_instance_access('celula', celula_id) def decorated_function(celula_id): db = get_db_connection() try: comprovantes = db.query(Comprovante).filter_by(celula_id=celula_id).all() return render_template('comprovantes/list.html', comprovantes=comprovantes) finally: db.close() return decorated_function(celula_id) @app.route('/setores//comprovantes') @require_login def list_comprovantes_setor(setor_id): @require_instance_access('setor', setor_id) def decorated_function(setor_id): db = get_db_connection() try: comprovantes = db.query(Comprovante).join(Usuario).filter(Usuario.setor_id == setor_id).all() return render_template('comprovantes/list.html', comprovantes=comprovantes) finally: db.close() return decorated_function(setor_id) @app.route('/crs//comprovantes') @require_login def list_comprovantes_cr(cr_id): @require_instance_access('cr', cr_id) def decorated_function(cr_id): db = get_db_connection() try: comprovantes = db.query(Comprovante).join(Usuario).filter(Usuario.cr_id == cr_id).all() return render_template('comprovantes/list.html', comprovantes=comprovantes) finally: db.close() return decorated_function(cr_id) @app.route('/celulas//comprovantes/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_CELL_RECEIPT', 'celula_id') def novo_comprovante_celula(celula_id): if request.method == 'POST': db = get_db_connection() try: comprovante = Comprovante( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], celula_id=celula_id ) db.add(comprovante) db.commit() flash('Comprovante registrado com sucesso!', 'success') return redirect(url_for('list_comprovantes_celula', celula_id=celula_id)) finally: db.close() return render_template('comprovantes/form.html') @app.route('/setores//comprovantes/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_SECTOR_RECEIPT', 'setor_id') def novo_comprovante_setor(setor_id): if request.method == 'POST': db = get_db_connection() try: comprovante = Comprovante( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], setor_id=setor_id ) db.add(comprovante) db.commit() flash('Comprovante registrado com sucesso!', 'success') return redirect(url_for('list_comprovantes_setor', setor_id=setor_id)) finally: db.close() return render_template('comprovantes/form.html') @app.route('/crs//comprovantes/novo', methods=['GET', 'POST']) @require_login @require_instance_permission('REGISTER_CR_RECEIPT', 'cr_id') def novo_comprovante_cr(cr_id): if request.method == 'POST': db = get_db_connection() try: comprovante = Comprovante( valor=request.form['valor'], data=request.form['data'], militante_id=request.form['militante_id'], cr_id=cr_id ) db.add(comprovante) db.commit() flash('Comprovante registrado com sucesso!', 'success') return redirect(url_for('list_comprovantes_cr', cr_id=cr_id)) finally: db.close() return render_template('comprovantes/form.html') @app.route('/dashboard') @login_required def dashboard(): """Rota para a página inicial do dashboard""" try: session = get_db_connection() # Buscar dados recentes comprovantes_recentes = session.query(Comprovante).order_by(Comprovante.data_comprovante.desc()).limit(5).all() tipos_comprovante = session.query(TipoComprovante).all() return render_template( 'dashboard.html', comprovantes_recentes=comprovantes_recentes, tipos_comprovante=tipos_comprovante ) except Exception as e: flash(f'Erro ao carregar dashboard: {str(e)}', 'error') return redirect(url_for('index')) finally: session.close() @app.route('/comprovantes') @login_required @require_permission(Permission.MANAGE_MATERIALS) def listar_comprovantes(): try: db = get_db_connection() comprovantes = db.query(Comprovante)\ .options(joinedload(Comprovante.centralizacoes))\ .options(joinedload(Comprovante.militante))\ .options(joinedload(Comprovante.campanha))\ .all() militantes = db.query(Militante).order_by(Militante.nome).all() campanhas = db.query(CampanhaFinanceira).all() return render_template('listar_comprovantes.html', comprovantes=comprovantes, militantes=militantes, campanhas=campanhas) except Exception as e: flash(f'Erro ao listar comprovantes: {str(e)}', 'error') return redirect(url_for('dashboard')) finally: db.close() @app.route('/comprovantes/', methods=['DELETE']) @login_required @require_permission(Permission.MANAGE_MATERIALS) def excluir_comprovante(id): try: comprovante = Comprovante.query.get_or_404(id) db.session.delete(comprovante) db.session.commit() return jsonify({'success': True}) except Exception as e: db.session.rollback() return jsonify({'success': False, 'message': str(e)}) return app def init_system(): """Inicializa o sistema com todos os usuários necessários""" print("Inicializando sistema...") # Inicializar banco de dados print("Inicializando banco de dados...") init_database() db = get_db_connection() try: # Verificar admin admin = db.query(Usuario).filter_by(username='admin').first() if admin: print("\nAdmin configurado:") print(f"Username: admin") print(f"Senha: admin123") if admin.otp_secret: print("OTP: Usando configuração existente") else: admin.otp_secret = pyotp.random_base32() db.commit() print("OTP: Nova configuração gerada") else: # Criar admin se não existir create_admin_user() # Verificar usuários de teste test_users = ['aligner', 'tester', 'deployer'] for username in test_users: user = db.query(Usuario).filter_by(username=username).first() if user: print(f"\nUsuário {username} configurado:") print(f"Username: {username}") print(f"Senha: Test123!@#") if user.otp_secret: print("OTP: Usando configuração existente") else: user.otp_secret = pyotp.random_base32() db.commit() print("OTP: Nova configuração gerada") else: # Criar usuário de teste se não existir create_admin_user() finally: db.close() print("\nInstruções:") print("1. Configure o OTP usando o QR code gerado (apenas para novos usuários)") print("2. Faça login com as credenciais fornecidas") print("3. Altere a senha no primeiro login") print("\nCredenciais:") print("Admin:") print("Username: admin") print("Senha: admin123") print("\nUsuários de teste:") print("Username: aligner, tester, deployer") print("Senha: Test123!@#") def main(): # Criar a aplicação app = create_app() # Inicializar o sistema init_system() # Configurar modo debug app.debug = True app.config['DEBUG'] = True return app # Criar a aplicação usando a função main app = main() if __name__ == '__main__': app.run( host='0.0.0.0', port=int(os.getenv('FLASK_PORT', 5000)), debug=True )