Files
controles/app.py

1732 lines
70 KiB
Python
Raw Normal View History

from flask import Flask, request, render_template, redirect, url_for, flash, session, jsonify, send_file
2025-01-08 00:19:49 -03:00
from functions.database import (
Base,
Militante,
CotaMensal,
TipoPagamento,
Pagamento,
MaterialVendido,
TipoMaterial,
VendaJornalAvulso,
engine,
AssinaturaAnual,
RelatorioCotasMensais,
RelatorioVendasMateriais,
Usuario,
get_db_connection,
ComiteRegional,
2025-04-03 11:24:47 -03:00
Setor,
Celula,
ComiteCentral,
EmailMilitante,
init_database,
EstadoMilitante,
Endereco,
2025-01-08 00:19:49 -03:00
)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, joinedload
from datetime import datetime, timedelta, date
from flask_bootstrap import Bootstrap5
from functions.validations import validar_cpf
from functools import wraps
from pathlib import Path
from time import time
from flask_mail import Mail, Message
2025-04-03 11:24:47 -03:00
import os
from dotenv import load_dotenv
from functions.rbac import Role, Permission, init_rbac
from functions.decorators import require_permission, require_role, require_minimum_role, require_login, require_instance_permission, require_instance_access
import re
import secrets
import pyotp
import qrcode
import base64
from io import BytesIO
from create_admin import create_admin_user
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import random
import string
from sqlalchemy.sql import func
from flask_wtf.csrf import CSRFProtect
import json
from utils.date_utils import validar_data, converter_data, validar_sequencia_datas, calcular_idade
from routes.admin import admin_bp # Importar o blueprint administrativo
import sys
2025-04-03 11:24:47 -03:00
load_dotenv()
2025-04-09 09:59:12 -03:00
def create_app():
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', secrets.token_hex(16))
bootstrap = Bootstrap5(app)
# Registrar o blueprint administrativo
app.register_blueprint(admin_bp)
2025-04-09 09:59:12 -03:00
# Configurar CSRF Protection
csrf = CSRFProtect()
csrf.init_app(app)
2025-04-09 09:59:12 -03:00
# Configurar cabeçalhos CSRF personalizados
app.config['WTF_CSRF_CHECK_DEFAULT'] = False
app.config['WTF_CSRF_HEADERS'] = ['X-CSRFToken']
2025-04-09 09:59:12 -03:00
# Configurar Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
# Adicionar filtros Jinja2
@app.template_filter('bitwise_and')
def bitwise_and(value1, value2):
"""Filtro para operação bit a bit AND"""
return value1 & value2
@login_manager.user_loader
def load_user(user_id):
"""Carrega o usuário pelo ID"""
db = get_db_connection()
try:
# Carregar o usuário com suas roles
user = db.query(Usuario).options(
joinedload(Usuario.roles)
).get(user_id)
return user
finally:
db.close()
2025-04-09 09:59:12 -03:00
def generate_qr_code(user):
"""Gera um QR code para o usuário"""
if not user.otp_secret:
user.otp_secret = pyotp.random_base32()
totp = pyotp.TOTP(user.otp_secret)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles"))
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8')
return qr_code
# Configurar Flask-Mail
app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER', 'smtp.gmail.com')
app.config['MAIL_PORT'] = int(os.getenv('MAIL_PORT', 587))
app.config['MAIL_USE_TLS'] = os.getenv('MAIL_USE_TLS', 'True').lower() == 'true'
app.config['MAIL_USERNAME'] = os.getenv('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.getenv('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = os.getenv('MAIL_DEFAULT_SENDER')
mail = Mail(app)
# Configuração da sessão do SQLAlchemy
db_session = get_db_connection()
# Decorator para verificar se o usuário está logado
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Por favor, faça login para acessar esta página.', 'warning')
return redirect(url_for('login'))
from sqlalchemy.orm import Session
db = get_db_connection()
try:
# Carregar o usuário com suas roles e permissões
user = db.query(Usuario).options(
joinedload(Usuario.roles).joinedload(Role.permissions),
joinedload(Usuario.militante),
joinedload(Usuario.cr),
joinedload(Usuario.setor),
joinedload(Usuario.celula)
).get(session['user_id'])
if not user:
flash('Usuário não encontrado.', 'danger')
return redirect(url_for('login'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
2025-04-09 09:59:12 -03:00
return decorated_function
# Decorator para verificar se a sessão expirou
def session_timeout(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if current_user.is_authenticated:
if 'last_activity' not in session:
session['last_activity'] = time()
return f(*args, **kwargs)
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
# Se passaram mais de 30 minutos (configurável)
timeout_minutes = 30
if now - last_activity > timedelta(minutes=timeout_minutes):
# Registrar o logout por timeout
try:
current_user.ultimo_logout = datetime.now()
current_user.motivo_logout = "Timeout de sessão"
db_session.commit()
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning')
return redirect(url_for('login'))
# Atualizar timestamp de último acesso
session['last_activity'] = time()
2025-04-09 09:59:12 -03:00
# Atualizar também no banco de dados
try:
2025-04-09 09:59:12 -03:00
current_user.update_last_activity()
db_session.commit()
except Exception as e:
2025-04-09 09:59:12 -03:00
print(f"Erro ao atualizar última atividade: {e}")
2025-04-09 09:59:12 -03:00
return f(*args, **kwargs)
return f(*args, **kwargs)
return decorated_function
# Rota raiz - redireciona para login se não estiver autenticado
@app.route("/")
@require_login
def index():
"""Rota principal"""
return redirect(url_for('home'))
# Rota de login
@app.route("/login", methods=["GET", "POST"])
def login():
"""Rota de login"""
if request.method == "POST":
email_or_username = request.form.get("email")
password = request.form.get("password")
otp = request.form.get("otp")
2025-04-09 09:59:12 -03:00
if not all([email_or_username, password]):
flash("Email/usuário e senha são obrigatórios.", "danger")
return redirect(url_for("login"))
2025-04-09 09:59:12 -03:00
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
# Tenta encontrar o usuário por email ou username
user = db.query(Usuario).filter(
(Usuario.email == email_or_username) |
(Usuario.username == email_or_username)
).first()
if not user or not user.check_password(password):
flash("Email/usuário ou senha incorretos.", "danger")
return redirect(url_for("login"))
# Verificar OTP se o usuário tiver configurado
if user.otp_secret and not otp:
flash("Código OTP é obrigatório para sua conta.", "danger")
return redirect(url_for("login"))
if user.otp_secret and not user.verify_otp(otp):
flash("Código OTP inválido.", "danger")
return redirect(url_for("login"))
# Atualizar último login
user.ultimo_login = datetime.utcnow()
db.commit()
# Fazer login e setar sessão
login_user(user)
session['user_id'] = user.id
session['username'] = user.username
session['is_admin'] = user.is_admin
print(f"Login realizado: user_id={user.id}, username={user.username}, is_admin={user.is_admin}")
2025-04-09 09:59:12 -03:00
# Redirecionar para home
2025-04-09 09:59:12 -03:00
return redirect(url_for("home"))
finally:
db.close()
2025-04-09 09:59:12 -03:00
return render_template("login.html")
# Rota de logout
@app.route("/logout")
@login_required
def logout():
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
user = current_user
if user:
user.logout()
db.commit()
logout_user()
finally:
db.close()
flash('Logout realizado com sucesso!', 'success')
return redirect(url_for('login'))
# Rota home (protegida)
@app.route("/home")
@require_login
def home():
"""Página inicial do sistema com dashboard"""
db = get_db_connection()
try:
# Buscar nome do usuário
usuario = db.query(Usuario).get(session.get('user_id'))
nome_usuario = usuario.username if usuario else "Usuário"
# Formatar data atual em português
data_atual = datetime.now().strftime("%d de %B de %Y")
2025-04-09 09:59:12 -03:00
# Buscar dados para o dashboard
total_militantes = db.query(Militante).count()
total_cotas = db.query(func.sum(CotaMensal.valor_novo)).scalar() or 0
total_materiais = db.query(MaterialVendido).count()
total_assinaturas = db.query(AssinaturaAnual).count()
2025-04-09 09:59:12 -03:00
# Buscar últimos militantes cadastrados
ultimos_militantes = db.query(Militante)\
.order_by(Militante.id.desc())\
.limit(5)\
.all()
# Buscar últimos pagamentos
ultimos_pagamentos = db.query(Pagamento)\
.join(Militante)\
.order_by(Pagamento.data_pagamento.desc())\
.limit(5)\
.all()
# Buscar tipos de pagamento
tipos_pagamento = db.query(TipoPagamento).all()
2025-04-09 09:59:12 -03:00
return render_template('home.html',
nome_usuario=nome_usuario,
data_atual=data_atual,
total_militantes=total_militantes,
total_cotas="{:.2f}".format(total_cotas),
total_materiais=total_materiais,
total_assinaturas=total_assinaturas,
ultimos_militantes=ultimos_militantes,
ultimos_pagamentos=ultimos_pagamentos,
tipos_pagamento=tipos_pagamento,
Militante=Militante)
except Exception as e:
print(f"Erro na página inicial: {e}")
import traceback
traceback.print_exc()
flash('Erro ao carregar a página inicial', 'danger')
return render_template('home.html',
nome_usuario="Usuário",
data_atual=datetime.now().strftime("%d/%m/%Y"),
total_militantes=0,
total_cotas="0.00",
total_materiais=0,
total_assinaturas=0,
ultimos_militantes=[],
ultimos_pagamentos=[],
Militante=Militante)
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para criar um novo militante
@app.route("/militantes/criar", methods=["POST"])
@require_login
@require_permission('gerenciar_militantes')
def criar_militante():
"""Cria um novo militante"""
db = get_db_connection()
try:
# Validar CPF
cpf = request.form.get('cpf')
if not validar_cpf(cpf):
return jsonify({
'status': 'error',
'message': 'CPF inválido'
}), 400
2025-04-03 11:24:47 -03:00
2025-04-09 09:59:12 -03:00
# Verificar se já existe militante com este CPF
militante_existente = db.query(Militante).filter(Militante.cpf == cpf).first()
if militante_existente:
return jsonify({
'status': 'error',
'message': 'CPF já cadastrado'
}), 400
2025-04-03 11:24:47 -03:00
2025-04-09 09:59:12 -03:00
# Criar endereço
endereco = Endereco(
cep=request.form.get('cep'),
estado=request.form.get('estado'),
cidade=request.form.get('cidade'),
bairro=request.form.get('bairro'),
logradouro=request.form.get('logradouro'),
numero=request.form.get('numero'),
complemento=request.form.get('complemento')
)
db.add(endereco)
db.flush() # Gerar ID do endereço
2025-04-03 11:24:47 -03:00
2025-04-09 09:59:12 -03:00
# Criar militante
militante = Militante(
# Dados Básicos
nome=request.form.get('nome'),
cpf=cpf,
titulo_eleitoral=request.form.get('titulo_eleitoral'),
data_nascimento=datetime.strptime(request.form.get('data_nascimento'), '%Y-%m-%d') if request.form.get('data_nascimento') else None,
data_entrada_oci=datetime.strptime(request.form.get('data_entrada_oci'), '%Y-%m-%d') if request.form.get('data_entrada_oci') else None,
data_efetivacao_oci=datetime.strptime(request.form.get('data_efetivacao_oci'), '%Y-%m-%d') if request.form.get('data_efetivacao_oci') else None,
# Contato
telefone1=request.form.get('telefone1'),
telefone2=request.form.get('telefone2'),
endereco_id=endereco.id,
# Profissional
profissao=request.form.get('profissao'),
regime_trabalho=request.form.get('regime_trabalho'),
empresa=request.form.get('empresa'),
contratante=request.form.get('contratante'),
# Acadêmico
instituicao_ensino=request.form.get('instituicao_ensino'),
tipo_instituicao=request.form.get('tipo_instituicao'),
# Sindical
sindicato=request.form.get('sindicato'),
cargo_sindical=request.form.get('cargo_sindical'),
central_sindical=request.form.get('central_sindical'),
dirigente_sindical=request.form.get('dirigente_sindical') == 'on',
# Organização
estado=EstadoMilitante(request.form.get('estado', 'ATIVO')),
celula_id=request.form.get('celula_id', type=int),
responsabilidades=request.form.get('responsabilidades', type=int, default=0),
# Por padrão, todo novo militante é aspirante
aspirante=True,
data_inicio_aspirante=datetime.now()
)
db.add(militante)
db.flush() # Gerar ID do militante
2025-04-03 11:24:47 -03:00
2025-04-09 09:59:12 -03:00
# Criar email principal
email = request.form.get('email')
if email:
if militante.emails:
militante.emails[0].endereco_email = email
else:
novo_email = EmailMilitante(
endereco_email=email,
militante_id=militante.id
)
db.add(novo_email)
militante.emails.append(novo_email)
2025-04-03 11:24:47 -03:00
db.commit()
2025-04-09 09:59:12 -03:00
return jsonify({
'status': 'success',
'message': 'Militante criado com sucesso!'
})
except Exception as e:
db.rollback()
2025-04-09 09:59:12 -03:00
return jsonify({
'status': 'error',
'message': f'Erro ao criar militante: {str(e)}'
}), 500
finally:
db.close()
2025-01-08 00:19:49 -03:00
2025-04-09 09:59:12 -03:00
# Rota para listar militantes
@app.route("/militantes")
@require_login
@require_permission(Permission.MANAGE_CELL_MEMBERS)
def listar_militantes():
db = get_db_connection()
2025-04-09 09:59:12 -03:00
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
celulas = db.query(Celula).order_by(Celula.nome).all()
return render_template('listar_militantes.html', militantes=militantes, Militante=Militante, celulas=celulas)
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para excluir militante
@app.route("/militantes/excluir/<int:id>", methods=["POST"])
@login_required
@session_timeout
def excluir_militante(id):
try:
db = get_db_connection()
militante = db.query(Militante).get(id)
if not militante:
flash('Militante não encontrado', 'danger')
return redirect(url_for('listar_militantes'))
db.delete(militante)
db.commit()
flash('Militante excluído com sucesso!', 'success')
except Exception as e:
db.rollback()
print(f"Erro ao excluir militante: {e}")
flash('Erro ao excluir militante', 'danger')
finally:
db.close()
return redirect(url_for('listar_militantes'))
# Rota para criar uma nova cota
@app.route("/cotas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def nova_cota():
if request.method == "POST":
militante_id = request.form.get("militante_id")
valor_antigo = float(request.form.get("valor_antigo"))
valor_novo = float(request.form.get("valor_novo"))
data_alteracao = datetime.strptime(request.form.get("data_alteracao"), "%Y-%m-%d").date()
data_vencimento = datetime.strptime(request.form.get("data_vencimento"), "%Y-%m-%d").date()
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
cotas_mensais = CotaMensal(
militante_id=militante_id,
valor_antigo=valor_antigo,
valor_novo=valor_novo,
data_alteracao=data_alteracao,
data_vencimento=data_vencimento,
pago=False
)
db.add(cotas_mensais)
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
2025-04-09 09:59:12 -03:00
'message': 'Cota cadastrada com sucesso!'
})
2025-04-09 09:59:12 -03:00
flash('Cota cadastrada com sucesso!', 'success')
return redirect(url_for('listar_cotas'))
except Exception as e:
db.rollback()
2025-04-09 09:59:12 -03:00
print(f"Erro ao cadastrar cota: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
2025-04-09 09:59:12 -03:00
'message': 'Erro ao cadastrar cota. Verifique os dados e tente novamente.'
}), 400
2025-04-09 09:59:12 -03:00
flash('Erro ao cadastrar cota', 'danger')
return render_template("nova_cota.html")
finally:
db.close()
2025-01-08 00:19:49 -03:00
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
militantes = db.query(Militante).order_by(Militante.nome).all()
return render_template("nova_cota.html", militantes=militantes)
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para listar cotas
@app.route("/cotas")
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def listar_cotas():
try:
print("Buscando cotas...")
db = get_db_connection()
cotas = db.query(CotaMensal)\
.join(Militante)\
.order_by(CotaMensal.data_vencimento.desc())\
.all()
# Calcular status de cada cota
for cota in cotas:
if cota.pago:
cota.status = "paga"
elif cota.data_vencimento < datetime.now().date():
cota.status = "atrasada"
else:
cota.status = "pendente"
# Buscar militantes para o modal de nova cota
militantes = db.query(Militante).order_by(Militante.nome).all()
return render_template("listar_cotas.html", cotas=cotas, militantes=militantes)
except Exception as e:
print(f"Erro ao listar cotas: {e}")
flash('Erro ao listar cotas', 'danger')
return render_template("listar_cotas.html", cotas=[], militantes=[])
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para editar cota
@app.route('/cotas/editar/<int:id>', methods=['GET', 'POST'])
@login_required
@session_timeout
def editar_cota(id):
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
cota = db.query(CotaMensal).get_or_404(id)
if request.method == 'POST':
try:
print("Dados recebidos:", request.form)
cota.militante_id = int(request.form['militante_id'])
cota.valor_antigo = float(request.form['valor_antigo'])
cota.valor_novo = float(request.form['valor_novo'])
cota.data_alteracao = datetime.strptime(request.form['data_alteracao'], '%Y-%m-%d').date()
cota.data_vencimento = datetime.strptime(request.form['data_vencimento'], '%Y-%m-%d').date()
# Processar o campo pago
pago = request.form.get('pago', '').lower()
print("Valor do campo pago recebido:", pago)
cota.pago = pago == 'true'
print("Status final do pago:", cota.pago)
db.commit()
print("Commit realizado com sucesso")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Cota atualizada com sucesso!'
})
flash('Cota atualizada com sucesso!', 'success')
return redirect(url_for('listar_cotas'))
except Exception as e:
db.rollback()
print(f"Erro ao atualizar cota: {str(e)}")
print(f"Tipo do erro: {type(e)}")
import traceback
traceback.print_exc()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': f'Erro ao atualizar cota: {str(e)}'
}), 400
flash('Erro ao atualizar cota. Verifique os dados e tente novamente.', 'danger')
return redirect(url_for('editar_cota', id=id))
return render_template('editar_cota.html', cota=cota)
except Exception as e:
print(f"Erro ao carregar cota: {str(e)}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': f'Erro ao carregar cota: {str(e)}'
}), 404
flash('Erro ao carregar cota', 'danger')
return redirect(url_for('listar_cotas'))
finally:
db.close()
# Rota para criar um novo pagamento
@app.route("/pagamentos/novo", methods=["GET", "POST"])
@require_login
def novo_pagamento():
if request.method == "POST":
try:
militante_id = request.form.get("militante_id")
tipo_pagamento_id = request.form.get("tipo_pagamento_id")
valor = float(request.form.get("valor"))
data_pagamento = converter_data(request.form.get("data_pagamento"))
if not validar_data(data_pagamento):
flash('Data de pagamento inválida ou futura', 'danger')
return redirect(url_for('novo_pagamento'))
db = get_db_connection()
2025-04-09 09:59:12 -03:00
pagamento = Pagamento(
militante_id=militante_id,
tipo_pagamento_id=tipo_pagamento_id,
valor=valor,
data_pagamento=data_pagamento
)
db.add(pagamento)
db.commit()
flash('Pagamento cadastrado com sucesso!', 'success')
return redirect(url_for('listar_pagamentos'))
except Exception as e:
db.rollback()
app.logger.error(f"Erro ao cadastrar pagamento: {e}")
2025-04-09 09:59:12 -03:00
flash('Erro ao cadastrar pagamento', 'danger')
return redirect(url_for('novo_pagamento'))
2025-04-09 09:59:12 -03:00
finally:
db.close()
# GET - Renderizar formulário
db = get_db_connection()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
tipos_pagamento = db.query(TipoPagamento).order_by(TipoPagamento.descricao).all()
return render_template("novo_pagamento.html", militantes=militantes, tipos_pagamento=tipos_pagamento)
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para listar pagamentos
@app.route("/pagamentos")
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def listar_pagamentos():
try:
print("Buscando pagamentos...")
db = get_db_connection()
pagamentos = db.query(Pagamento)\
.join(Militante)\
.order_by(Pagamento.data_pagamento.desc())\
.all()
2025-04-09 09:59:12 -03:00
militantes = db.query(Militante).order_by(Militante.nome).all()
tipos_pagamento = db.query(TipoPagamento).all()
2025-04-09 09:59:12 -03:00
return render_template("listar_pagamentos.html",
pagamentos=pagamentos,
militantes=militantes,
tipos_pagamento=tipos_pagamento)
except Exception as e:
print(f"Erro ao listar pagamentos: {e}")
flash('Erro ao listar pagamentos', 'danger')
return render_template("listar_pagamentos.html", pagamentos=[], militantes=[])
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para adicionar pagamento
@app.route("/pagamentos/adicionar", methods=["POST"])
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def adicionar_pagamento():
if request.method == "POST":
try:
militante_id = request.form.get("militante_id")
tipo_pagamento = request.form.get("tipo_pagamento")
valor = float(request.form.get("valor"))
data_pagamento = converter_data(request.form.get("data_pagamento"))
2025-01-08 00:19:49 -03:00
2025-04-09 09:59:12 -03:00
db = get_db_connection()
pagamento = Pagamento(
militante_id=militante_id,
tipo_pagamento=tipo_pagamento,
valor=valor,
data_pagamento=data_pagamento
)
db.add(pagamento)
db.commit()
flash('Pagamento adicionado com sucesso!', 'success')
except Exception as e:
db.rollback()
flash(f'Erro ao adicionar pagamento: {str(e)}', 'danger')
finally:
db.close()
return redirect(url_for('listar_pagamentos'))
2025-04-09 09:59:12 -03:00
# Rota para criar um novo material vendido
@app.route("/materiais/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_material():
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
militante_id = request.form.get('militante_id')
tipo_material_id = request.form.get('tipo_material_id')
descricao = request.form.get('descricao')
valor = float(request.form.get('valor'))
data_venda = converter_data(request.form.get('data_venda'))
2025-04-09 09:59:12 -03:00
material = MaterialVendido(
militante_id=militante_id,
tipo_material_id=tipo_material_id,
descricao=descricao,
valor=valor,
data_venda=data_venda
)
2025-04-09 09:59:12 -03:00
db.add(material)
db.commit()
2025-04-09 09:59:12 -03:00
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Material cadastrado com sucesso!'
})
flash('Material cadastrado com sucesso!', 'success')
return redirect(url_for('listar_materiais'))
except Exception as e:
db.rollback()
2025-04-09 09:59:12 -03:00
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao cadastrar material. Por favor, tente novamente.'
}), 400
flash('Erro ao cadastrar material. Por favor, tente novamente.', 'error')
return redirect(url_for('listar_materiais'))
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para listar materiais vendidos
@app.route("/materiais")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_materiais():
db = get_db_connection()
2025-04-09 09:59:12 -03:00
try:
materiais = db.query(MaterialVendido).join(Militante).join(TipoMaterial).all()
militantes = db.query(Militante).all()
tipos_material = db.query(TipoMaterial).all()
return render_template('listar_materiais.html',
materiais=materiais,
militantes=militantes,
tipos_material=tipos_material)
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para criar uma nova venda de jornal
@app.route("/jornais/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def nova_venda_jornal():
if request.method == "POST":
try:
militante_id = request.form.get('militante_id')
quantidade = int(request.form.get('quantidade'))
valor_total = float(request.form.get('valor_total'))
data_venda = converter_data(request.form.get('data_venda'))
if not validar_data(data_venda):
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Data de venda inválida ou futura'
}), 400
flash('Data de venda inválida ou futura', 'danger')
return redirect(url_for('nova_venda_jornal'))
db = get_db_connection()
venda = VendaJornalAvulso(
militante_id=militante_id,
quantidade=quantidade,
valor_total=valor_total,
data_venda=data_venda
)
db.add(venda)
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Venda cadastrada com sucesso!'
})
flash('Venda cadastrada com sucesso!', 'success')
return redirect(url_for('listar_vendas_jornal'))
except Exception as e:
db.rollback()
app.logger.error(f"Erro ao cadastrar venda: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao cadastrar venda'
}), 400
flash('Erro ao cadastrar venda', 'danger')
return redirect(url_for('nova_venda_jornal'))
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para listar vendas de jornal
@app.route("/jornais")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_vendas_jornal():
db = get_db_connection()
try:
vendas = db.query(VendaJornalAvulso).join(Militante).all()
militantes = db.query(Militante).all()
return render_template('listar_vendas_jornal.html',
vendas=vendas,
militantes=militantes)
finally:
db.close()
# Rota para criar um novo relatório de cotas
@app.route("/relatorios/cotas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_relatorio_cotas():
if request.method == "POST":
try:
setor_id = request.form.get("setor_id")
comite_id = request.form.get("comite_id")
total_cotas = float(request.form.get("total_cotas"))
data_relatorio = request.form.get("data_relatorio")
# Validar data
if not validar_data(data_relatorio):
flash('Data do relatório inválida', 'danger')
return render_template("novo_relatorio_cotas.html")
# Converter data
data_relatorio = converter_data(data_relatorio)
# Validar data futura
if data_relatorio > date.today():
flash('A data do relatório não pode ser futura', 'danger')
return render_template("novo_relatorio_cotas.html")
db = get_db_connection()
try:
relatorio_cotas_mensais = RelatorioCotasMensais(
setor_id=setor_id,
comite_id=comite_id,
total_cotas=total_cotas,
data_relatorio=data_relatorio
)
db.add(relatorio_cotas_mensais)
db.commit()
flash('Relatório de cotas cadastrado com sucesso!', 'success')
return redirect(url_for('listar_relatorios_cotas'))
except Exception as e:
db.rollback()
app.logger.error(f"Erro ao cadastrar relatório de cotas: {e}")
flash('Erro ao cadastrar relatório de cotas', 'danger')
return render_template("novo_relatorio_cotas.html")
finally:
db.close()
except ValueError as e:
flash(str(e), 'danger')
2025-04-09 09:59:12 -03:00
return render_template("novo_relatorio_cotas.html")
db = get_db_connection()
2025-04-09 09:59:12 -03:00
try:
setores = db.query(Setor).order_by(Setor.nome).all()
comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all()
return render_template("novo_relatorio_cotas.html",
setores=setores,
comites=comites,
hoje=date.today().strftime('%Y-%m-%d'))
2025-04-09 09:59:12 -03:00
finally:
db.close()
# Rota para listar relatórios de cotas
@app.route("/relatorios/cotas")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_relatorios_cotas():
try:
db = get_db_connection()
relatorios = db.query(RelatorioCotasMensais).order_by(RelatorioCotasMensais.data_relatorio.desc()).all()
return render_template("listar_relatorios_cotas.html", relatorios=relatorios)
except Exception as e:
print(f"Erro ao listar relatórios de cotas: {e}")
flash('Erro ao listar relatórios de cotas', 'danger')
return render_template("listar_relatorios_cotas.html", relatorios=[])
finally:
db.close()
# Rota para criar um novo relatório de vendas
@app.route("/relatorios/vendas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_relatorio_vendas():
if request.method == "POST":
try:
setor_id = request.form.get("setor_id")
comite_id = request.form.get("comite_id")
total_vendas = float(request.form.get("total_vendas"))
data_relatorio = request.form.get("data_relatorio")
# Validar data
if not validar_data(data_relatorio):
flash('Data do relatório inválida', 'danger')
return render_template("novo_relatorio_vendas.html")
# Converter data
data_relatorio = converter_data(data_relatorio)
# Validar data futura
if data_relatorio > date.today():
flash('A data do relatório não pode ser futura', 'danger')
return render_template("novo_relatorio_vendas.html")
db = get_db_connection()
try:
relatorio_vendas_materiais = RelatorioVendasMateriais(
setor_id=setor_id,
comite_id=comite_id,
total_vendas=total_vendas,
data_relatorio=data_relatorio
)
db.add(relatorio_vendas_materiais)
db.commit()
flash('Relatório de vendas cadastrado com sucesso!', 'success')
return redirect(url_for('listar_relatorios_vendas'))
except Exception as e:
db.rollback()
app.logger.error(f"Erro ao cadastrar relatório de vendas: {e}")
flash('Erro ao cadastrar relatório de vendas', 'danger')
return render_template("novo_relatorio_vendas.html")
finally:
db.close()
except ValueError as e:
flash(str(e), 'danger')
2025-04-09 09:59:12 -03:00
return render_template("novo_relatorio_vendas.html")
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
setores = db.query(Setor).order_by(Setor.nome).all()
comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all()
return render_template("novo_relatorio_vendas.html",
setores=setores,
comites=comites,
hoje=date.today().strftime('%Y-%m-%d'))
2025-04-09 09:59:12 -03:00
finally:
db.close()
# Rota para listar relatórios de vendas
@app.route("/relatorios/vendas")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_relatorios_vendas():
try:
db = get_db_connection()
relatorios = db.query(RelatorioVendasMateriais).order_by(RelatorioVendasMateriais.data_relatorio.desc()).all()
return render_template("listar_relatorios_vendas.html", relatorios=relatorios)
except Exception as e:
print(f"Erro ao listar relatórios de vendas: {e}")
flash('Erro ao listar relatórios de vendas', 'danger')
return render_template("listar_relatorios_vendas.html", relatorios=[])
finally:
db.close()
# Rota para editar militante
@app.route('/militantes/editar/<int:militante_id>', methods=['POST'])
@login_required
2025-04-09 09:59:12 -03:00
@require_permission('gerenciar_militantes')
def editar_militante(militante_id):
try:
db = get_db_connection()
militante = db.query(Militante).get(militante_id)
if not militante:
return jsonify({
'status': 'error',
'message': 'Militante não encontrado'
}), 404
# Obter dados do formulário
nome = request.form.get('nome')
cpf = request.form.get('cpf')
titulo_eleitoral = request.form.get('titulo_eleitoral')
data_nascimento = request.form.get('data_nascimento')
data_entrada_oci = request.form.get('data_entrada_oci')
data_efetivacao_oci = request.form.get('data_efetivacao_oci')
telefone1 = request.form.get('telefone1')
telefone2 = request.form.get('telefone2')
email = request.form.get('email')
# Validar e converter datas
try:
data_nascimento = converter_data(data_nascimento) if data_nascimento else None
data_entrada_oci = converter_data(data_entrada_oci) if data_entrada_oci else None
data_efetivacao_oci = converter_data(data_efetivacao_oci) if data_efetivacao_oci else None
# Validar sequência lógica das datas
validar_sequencia_datas(
data_nascimento=data_nascimento,
data_entrada=data_entrada_oci,
data_efetivacao=data_efetivacao_oci
)
except ValueError as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 400
# Atualizar dados básicos
if nome: militante.nome = nome
if cpf: militante.cpf = cpf
if titulo_eleitoral: militante.titulo_eleitoral = titulo_eleitoral
militante.data_nascimento = data_nascimento
militante.data_entrada_oci = data_entrada_oci
militante.data_efetivacao_oci = data_efetivacao_oci
militante.telefone1 = telefone1
militante.telefone2 = telefone2
# Atualizar email
if email:
# Verificar se já existe email
email_existente = db.query(EmailMilitante).filter_by(militante_id=militante_id).first()
if email_existente:
email_existente.endereco_email = email
else:
novo_email = EmailMilitante(endereco_email=email, militante_id=militante_id)
db.add(novo_email)
# Calcular idade
militante.idade = calcular_idade(data_nascimento) if data_nascimento else None
try:
db.commit()
return jsonify({
'status': 'success',
'message': 'Militante atualizado com sucesso',
'data': {
'nome': militante.nome,
'cpf': militante.cpf,
'idade': militante.idade,
'emails': [e.endereco_email for e in militante.emails],
'telefone1': militante.telefone1,
'celula_id': str(militante.celula_id) if militante.celula_id else None,
'responsabilidades_valor': militante.responsabilidades
}
})
except Exception as e:
db.rollback()
app.logger.error(f"Erro ao salvar militante: {e}")
return jsonify({
'status': 'error',
'message': 'Erro ao salvar alterações no banco de dados'
}), 500
except Exception as e:
app.logger.error(f"Erro ao editar militante: {e}")
return jsonify({
'status': 'error',
'message': 'Erro interno do servidor'
}), 500
2025-04-09 09:59:12 -03:00
# Rota para criar um novo usuário
@app.route("/usuarios/novo", methods=["GET", "POST"])
@login_required
def novo_usuario():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
email = request.form.get("email")
role_id = request.form.get("role_id")
setor_id = request.form.get("setor_id")
# Verificar se usuário já existe
db = get_db_connection()
try:
if db.query(Usuario).filter_by(username=username).first():
flash('Nome de usuário já existe.', 'danger')
return render_template("novo_usuario.html")
novo_usuario = Usuario(
username=username,
email=email,
role_id=role_id,
setor_id=setor_id
)
novo_usuario.set_password(password)
novo_usuario.otp_secret = pyotp.random_base32()
db.add(novo_usuario)
db.commit()
flash('Usuário cadastrado com sucesso!', 'success')
return redirect(url_for('listar_usuarios'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar usuário: {e}")
flash('Erro ao cadastrar usuário', 'danger')
return render_template("novo_usuario.html")
2025-04-09 09:59:12 -03:00
finally:
db.close()
2025-04-09 09:59:12 -03:00
db = get_db_connection()
try:
roles = db.query(Role).order_by(Role.nome).all()
setores = db.query(Setor).order_by(Setor.nome).all()
return render_template("novo_usuario.html", roles=roles, setores=setores)
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Rota para verificar status da sessão
@app.route('/check_session')
def check_session():
if 'user_id' not in session:
return jsonify({'status': 'expired'})
2025-04-09 09:59:12 -03:00
if 'last_activity' in session:
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
2025-04-09 09:59:12 -03:00
if now - last_activity > timedelta(hours=2):
# Registrar o logout por timeout
try:
db = get_db_connection()
user = db.query(Usuario).get(session.get('user_id'))
if user:
user.ultimo_logout = datetime.now()
user.motivo_logout = "Timeout de sessão"
db.commit()
db.close()
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
return jsonify({'status': 'expired'})
2025-04-09 09:59:12 -03:00
return jsonify({'status': 'active'})
2025-04-09 09:59:12 -03:00
@app.route("/qr/<token>")
def get_qr_code(token):
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
user = db.query(Usuario).filter_by(username='admin').first()
if not user:
flash('Usuário não encontrado', 'error')
return redirect(url_for('login'))
qr_uri = user.get_otp_uri()
return render_template('mostrar_qr_code.html', qr_uri=qr_uri)
finally:
db.close()
2025-04-09 09:59:12 -03:00
# Adicionar nova rota para API de setores
@app.route("/api/setores/<int:cr_id>")
@require_login
def get_setores(cr_id):
setores = db_session.query(Setor).filter_by(cr_id=cr_id).all()
return jsonify({
'setores': [{'id': s.id, 'nome': s.nome} for s in setores]
})
2025-04-09 09:59:12 -03:00
@app.route('/celulas/<int:celula_id>/militantes')
@require_login
def list_militantes_celula(celula_id):
@require_instance_access('celula', celula_id)
def decorated_function(celula_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(celula_id=celula_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(celula_id)
2025-04-09 09:59:12 -03:00
@app.route('/setores/<int:setor_id>/militantes')
@require_login
def list_militantes_setor(setor_id):
@require_instance_access('setor', setor_id)
def decorated_function(setor_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(setor_id=setor_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(setor_id)
2025-04-09 09:59:12 -03:00
@app.route('/crs/<int:cr_id>/militantes')
@require_login
def list_militantes_cr(cr_id):
@require_instance_access('cr', cr_id)
def decorated_function(cr_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(cr_id=cr_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(cr_id)
@app.route('/celulas/<int:celula_id>/pagamentos')
@require_login
def list_pagamentos_celula(celula_id):
@require_instance_access('celula', celula_id)
def decorated_function(celula_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).filter_by(celula_id=celula_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(celula_id)
@app.route('/setores/<int:setor_id>/pagamentos')
@require_login
def list_pagamentos_setor(setor_id):
@require_instance_access('setor', setor_id)
def decorated_function(setor_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(setor_id)
@app.route('/crs/<int:cr_id>/pagamentos')
@require_login
def list_pagamentos_cr(cr_id):
@require_instance_access('cr', cr_id)
def decorated_function(cr_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(cr_id)
@app.route('/celulas/<int:celula_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_CELL_PAYMENT', 'celula_id')
def novo_pagamento_celula(celula_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
celula_id=celula_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_celula', celula_id=celula_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route('/setores/<int:setor_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_SECTOR_PAYMENT', 'setor_id')
def novo_pagamento_setor(setor_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
setor_id=setor_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_setor', setor_id=setor_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route('/crs/<int:cr_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_CR_PAYMENT', 'cr_id')
def novo_pagamento_cr(cr_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
cr_id=cr_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_cr', cr_id=cr_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route("/alterar_senha", methods=["GET", "POST"])
@require_login
def alterar_senha():
"""Rota para alterar a senha do usuário"""
if request.method == "POST":
senha_atual = request.form.get("senha_atual")
nova_senha = request.form.get("nova_senha")
confirmar_senha = request.form.get("confirmar_senha")
if not all([senha_atual, nova_senha, confirmar_senha]):
flash("Todos os campos são obrigatórios.", "error")
return redirect(url_for("alterar_senha"))
if nova_senha != confirmar_senha:
flash("As senhas não coincidem.", "error")
return redirect(url_for("alterar_senha"))
db = get_db_connection()
try:
user = db.query(Usuario).get(current_user.id)
if not user.check_password(senha_atual):
flash("Senha atual incorreta.", "error")
return redirect(url_for("alterar_senha"))
user.password_hash = generate_password_hash(nova_senha)
db.commit()
flash("Senha alterada com sucesso!", "success")
return redirect(url_for("home"))
finally:
db.close()
return render_template("alterar_senha.html")
@app.route('/usuarios/<int:user_id>/toggle_status', methods=['POST'])
@require_login
def toggle_user_status(user_id):
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Você não tem permissão para alterar o status de usuários.'
}), 403
db = get_db_connection()
try:
usuario = db.query(Usuario).get(user_id)
if not usuario:
return jsonify({
'success': False,
'error': 'Usuário não encontrado.'
}), 404
usuario.ativo = not usuario.ativo
db.commit()
return jsonify({
'success': True,
'message': f'Usuário {"ativado" if usuario.ativo else "desativado"} com sucesso!'
})
except Exception as e:
db.rollback()
return jsonify({
'success': False,
'error': str(e)
}), 500
finally:
db.close()
2025-04-09 09:59:12 -03:00
@app.route('/usuarios/<int:user_id>/alterar_nivel', methods=['POST'])
@require_login
def alterar_nivel(user_id):
user = db_session.query(Usuario).get_or_404(user_id)
novo_nivel = request.json.get('nivel')
# Verificar permissões baseado na hierarquia
if not current_user.has_permission('system_config'):
if current_user.has_permission('manage_cr_sectors'):
# Secretário de CR só pode alterar níveis dentro do seu CR
if user.cr_id != current_user.cr_id:
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'})
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor só pode alterar níveis dentro do seu setor
if user.setor_id != current_user.setor_id:
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'})
else:
# Outros níveis não podem alterar níveis
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar níveis de usuários.'})
# Verificar se o novo nível é válido para o nível hierárquico do usuário atual
if current_user.has_permission('system_config'):
# Secretário Geral e Secretário de Organização podem alterar para qualquer nível
pass
elif current_user.has_permission('manage_cr_sectors'):
# Secretário de CR só pode alterar para níveis do CR
if novo_nivel not in ['membro_cr', 'secretario_cr']:
return jsonify({'success': False, 'message': 'Nível inválido para este CR.'})
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor só pode alterar para níveis do setor
if novo_nivel not in ['membro_setor', 'secretario_setor']:
return jsonify({'success': False, 'message': 'Nível inválido para este setor.'})
# Atualizar o nível do usuário
user.role = novo_nivel
db_session.commit()
return jsonify({'success': True, 'message': 'Nível do usuário alterado com sucesso!'})
2025-04-09 09:59:12 -03:00
@app.route('/usuarios/<int:user_id>/toggle_quadro_orientador', methods=['POST'])
@require_login
def toggle_quadro_orientador(user_id):
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
user = db.query(Usuario).get(user_id)
if not user:
return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404
# Verificar permissões
if not (current_user.has_permission('system_config') or
(current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or
(current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)):
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar esta responsabilidade'}), 403
# Verificar se o usuário tem um militante associado
if not user.militante:
return jsonify({'success': False, 'message': 'Usuário não tem um militante associado'}), 400
# Alternar o status de Quadro-Orientador
user.militante.quadro_orientador = not user.militante.quadro_orientador
# Atualizar a responsabilidade no campo responsabilidades
if user.militante.quadro_orientador:
user.militante.responsabilidades |= Militante.QUADRO_ORIENTADOR
else:
user.militante.responsabilidades &= ~Militante.QUADRO_ORIENTADOR
db.commit()
return jsonify({
'success': True,
'message': f'Responsabilidade de Quadro-Orientador {"adicionada" if user.militante.quadro_orientador else "removida"} com sucesso'
})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db.close()
2025-04-09 09:59:12 -03:00
@app.route("/cotas/excluir/<int:id>", methods=["POST"])
@login_required
@session_timeout
def excluir_cota(id):
"""Exclui uma cota mensal"""
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
cota = db.query(CotaMensal).get(id)
if not cota:
flash('Cota não encontrada.', 'danger')
return redirect(url_for('listar_cotas'))
# Excluir a cota
db.delete(cota)
db.commit()
2025-04-09 09:59:12 -03:00
flash('Cota excluída com sucesso!', 'success')
except Exception as e:
db.rollback()
flash('Erro ao excluir cota. Por favor, tente novamente.', 'danger')
print(f"Erro ao excluir cota: {e}")
finally:
db.close()
2025-04-09 09:59:12 -03:00
return redirect(url_for('listar_cotas'))
2025-04-09 09:59:12 -03:00
@app.route("/assinaturas")
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def listar_assinaturas():
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
assinaturas = db.query(AssinaturaAnual).join(Militante).all()
militantes = db.query(Militante).all()
return render_template('listar_assinaturas.html',
assinaturas=assinaturas,
militantes=militantes)
finally:
db.close()
2025-04-09 09:59:12 -03:00
@app.route("/assinaturas/novo", methods=['POST'])
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def nova_assinatura():
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
data = request.form
# Validar dados
if not all(k in data for k in ['militante_id', 'data_inicio', 'data_fim', 'valor']):
return jsonify({'success': False, 'message': 'Todos os campos são obrigatórios'})
# Criar nova assinatura
assinatura = AssinaturaAnual(
militante_id=data['militante_id'],
data_inicio=datetime.strptime(data['data_inicio'], '%Y-%m-%d'),
data_fim=datetime.strptime(data['data_fim'], '%Y-%m-%d'),
valor=float(data['valor'])
)
2025-04-09 09:59:12 -03:00
db.add(assinatura)
db.commit()
2025-04-09 09:59:12 -03:00
return jsonify({'success': True})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)})
finally:
db.close()
2025-04-09 09:59:12 -03:00
@app.route("/assinaturas/excluir/<int:id>", methods=['POST'])
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def excluir_assinatura(id):
db = get_db_connection()
try:
2025-04-09 09:59:12 -03:00
assinatura = db.query(AssinaturaAnual).get(id)
if not assinatura:
return jsonify({'success': False, 'message': 'Assinatura não encontrada'})
2025-04-09 09:59:12 -03:00
db.delete(assinatura)
db.commit()
2025-04-09 09:59:12 -03:00
return jsonify({'success': True})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)})
finally:
db.close()
2025-04-09 09:59:12 -03:00
@app.route("/militantes/dados/<int:militante_id>")
@require_login
@require_permission('gerenciar_militantes')
def buscar_dados_militante(militante_id):
"""Busca os dados de um militante específico"""
2025-04-09 09:59:12 -03:00
db = get_db_connection()
try:
militante = db.query(Militante).get(militante_id)
2025-04-09 09:59:12 -03:00
if not militante:
print(f"Militante não encontrado: ID {militante_id}")
2025-04-09 09:59:12 -03:00
return jsonify({
'status': 'error',
'message': 'Militante não encontrado'
}), 404
# Função auxiliar para formatar data com validação
def formatar_data_segura(data):
try:
if not data:
return None
return data.strftime('%Y-%m-%d')
except Exception as e:
print(f"Erro ao formatar data: {str(e)}, valor: {data}")
return None
# Formatar datas com validação
data_nascimento = formatar_data_segura(militante.data_nascimento)
data_entrada_oci = formatar_data_segura(militante.data_entrada_oci)
data_efetivacao_oci = formatar_data_segura(militante.data_efetivacao_oci)
print(f"Dados do militante {militante_id} recuperados com sucesso")
print(f"Datas formatadas: nascimento={data_nascimento}, entrada={data_entrada_oci}, efetivação={data_efetivacao_oci}")
return jsonify({
'status': 'success',
2025-04-09 09:59:12 -03:00
'id': militante.id,
'nome': militante.nome,
'cpf': militante.cpf,
'titulo_eleitoral': militante.titulo_eleitoral,
'data_nascimento': data_nascimento,
'data_entrada_oci': data_entrada_oci,
'data_efetivacao_oci': data_efetivacao_oci,
'emails': [email.endereco_email for email in militante.emails] if militante.emails else [],
2025-04-09 09:59:12 -03:00
'telefone1': militante.telefone1,
'telefone2': militante.telefone2,
'celula_id': militante.celula_id,
'responsabilidades_valor': militante.responsabilidades,
2025-04-09 09:59:12 -03:00
'sindicato': militante.sindicato,
'cargo_sindical': militante.cargo_sindical,
'central_sindical': militante.central_sindical,
'dirigente_sindical': militante.dirigente_sindical
})
2025-04-09 09:59:12 -03:00
except Exception as e:
import traceback
print(f"Erro ao buscar dados do militante {militante_id}:")
print(traceback.format_exc())
return jsonify({
'status': 'error',
'message': f'Erro ao buscar dados do militante: {str(e)}'
2025-04-09 09:59:12 -03:00
}), 500
finally:
db.close()
return app
def init_system():
"""Inicializa o sistema com todos os usuários necessários"""
print("Inicializando sistema...")
# Inicializar banco de dados
print("Inicializando banco de dados...")
init_database()
db = get_db_connection()
try:
# Verificar admin
admin = db.query(Usuario).filter_by(username='admin').first()
if admin:
print("\nAdmin configurado:")
print(f"Username: admin")
print(f"Senha: admin123")
if admin.otp_secret:
print("OTP: Usando configuração existente")
else:
admin.otp_secret = pyotp.random_base32()
db.commit()
print("OTP: Nova configuração gerada")
else:
# Criar admin se não existir
create_admin_user()
# Verificar usuários de teste
test_users = ['aligner', 'tester', 'deployer']
for username in test_users:
user = db.query(Usuario).filter_by(username=username).first()
if user:
print(f"\nUsuário {username} configurado:")
print(f"Username: {username}")
print(f"Senha: Test123!@#")
if user.otp_secret:
print("OTP: Usando configuração existente")
else:
user.otp_secret = pyotp.random_base32()
db.commit()
print("OTP: Nova configuração gerada")
else:
# Criar usuário de teste se não existir
create_admin_user()
finally:
db.close()
print("\nInstruções:")
print("1. Configure o OTP usando o QR code gerado (apenas para novos usuários)")
print("2. Faça login com as credenciais fornecidas")
print("3. Altere a senha no primeiro login")
print("\nCredenciais:")
print("Admin:")
print("Username: admin")
print("Senha: admin123")
print("\nUsuários de teste:")
print("Username: aligner, tester, deployer")
print("Senha: Test123!@#")
2025-04-09 09:59:12 -03:00
def main():
# Criar a aplicação
app = create_app()
return app
# Criar a aplicação usando a função main
app = main()
if __name__ == '__main__':
# Verificar se é para inicializar o sistema
if '--init' in sys.argv:
init_system()
else:
app.run(
host='0.0.0.0',
port=5000,
debug=os.getenv('FLASK_ENV') == 'development'
)