Files
controles/app.py

1710 lines
64 KiB
Python

from flask import Flask, request, render_template, redirect, url_for, flash, session, jsonify, send_file
from functions.database import (
Base,
Militante,
CotaMensal,
TipoPagamento,
Pagamento,
MaterialVendido,
TipoMaterial,
VendaJornalAvulso,
engine,
AssinaturaAnual,
RelatorioCotasMensais,
RelatorioVendasMateriais,
Usuario,
get_db_connection,
ComiteRegional,
Setor,
Celula,
ComiteCentral,
EmailMilitante,
init_database,
EstadoMilitante,
Endereco,
)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, joinedload
from datetime import datetime, timedelta
from flask_bootstrap import Bootstrap5
from functions.validations import validar_cpf
from functools import wraps
from pathlib import Path
from time import time
from flask_mail import Mail, Message
import os
from dotenv import load_dotenv
from functions.rbac import Role, Permission, init_rbac
from functions.decorators import require_permission, require_role, require_minimum_role, require_login, require_instance_permission, require_instance_access
import re
import secrets
import pyotp
import qrcode
import base64
from io import BytesIO
from create_admin import create_admin_user
from create_test_users import create_test_users
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import random
import string
from sqlalchemy.sql import func
from flask_wtf.csrf import CSRFProtect
import json
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', secrets.token_hex(16))
bootstrap = Bootstrap5(app)
# Configurar CSRF Protection
csrf = CSRFProtect(app)
# Configurar Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
# Adicionar filtros Jinja2
@app.template_filter('bitwise_and')
def bitwise_and(value1, value2):
"""Filtro para operação bit a bit AND"""
return value1 & value2
@login_manager.user_loader
def load_user(user_id):
"""Carrega o usuário pelo ID"""
db = get_db_connection()
try:
# Carregar o usuário com suas roles
user = db.query(Usuario).options(
joinedload(Usuario.roles)
).get(user_id)
return user
finally:
db.close()
def generate_qr_code(user):
"""Gera um QR code para o usuário"""
if not user.otp_secret:
user.otp_secret = pyotp.random_base32()
totp = pyotp.TOTP(user.otp_secret)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles"))
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8')
return qr_code
# Configuração da sessão do SQLAlchemy
db_session = get_db_connection()
# Configurar Flask-Mail
app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER', 'smtp.gmail.com')
app.config['MAIL_PORT'] = int(os.getenv('MAIL_PORT', 587))
app.config['MAIL_USE_TLS'] = os.getenv('MAIL_USE_TLS', 'True').lower() == 'true'
app.config['MAIL_USERNAME'] = os.getenv('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.getenv('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = os.getenv('MAIL_DEFAULT_SENDER')
mail = Mail(app)
# Inicializar banco de dados e RBAC
print("Inicializando banco de dados...")
init_database()
print("Inicializando sistema RBAC...")
init_rbac()
# Criar admin e usuários de teste
print("Criando usuários iniciais...")
create_admin_user()
create_test_users()
# Decorator para verificar se o usuário está logado
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Por favor, faça login para acessar esta página.', 'warning')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
# Decorator para verificar se a sessão expirou
def session_timeout(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if current_user.is_authenticated:
if 'last_activity' not in session:
session['last_activity'] = time()
return f(*args, **kwargs)
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
# Se passaram mais de 30 minutos (configurável)
timeout_minutes = 30
if now - last_activity > timedelta(minutes=timeout_minutes):
# Registrar o logout por timeout
try:
current_user.ultimo_logout = datetime.now()
current_user.motivo_logout = "Timeout de sessão"
db_session.commit()
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning')
return redirect(url_for('login'))
# Atualizar timestamp de último acesso
session['last_activity'] = time()
# Atualizar também no banco de dados
try:
current_user.update_last_activity()
db_session.commit()
except Exception as e:
print(f"Erro ao atualizar última atividade: {e}")
return f(*args, **kwargs)
return f(*args, **kwargs)
return decorated_function
# Rota raiz - redireciona para login se não estiver autenticado
@app.route("/")
@require_login
def index():
"""Rota principal"""
return redirect(url_for('home'))
# Rota de login
@app.route("/login", methods=["GET", "POST"])
def login():
"""Rota de login"""
if request.method == "POST":
email_or_username = request.form.get("email")
password = request.form.get("password")
otp = request.form.get("otp")
if not all([email_or_username, password]):
flash("Email/usuário e senha são obrigatórios.", "danger")
return redirect(url_for("login"))
db = get_db_connection()
try:
# Tenta encontrar o usuário por email ou username
user = db.query(Usuario).filter(
(Usuario.email == email_or_username) |
(Usuario.username == email_or_username)
).first()
if not user or not user.check_password(password):
flash("Email/usuário ou senha incorretos.", "danger")
return redirect(url_for("login"))
# Verificar OTP se o usuário tiver configurado
if user.otp_secret and not otp:
flash("Código OTP é obrigatório para sua conta.", "danger")
return redirect(url_for("login"))
if user.otp_secret and not user.verify_otp(otp):
flash("Código OTP inválido.", "danger")
return redirect(url_for("login"))
# Atualizar último login
user.ultimo_login = datetime.utcnow()
db.commit()
# Fazer login e setar sessão
login_user(user)
session['user_id'] = user.id
session['username'] = user.username
session['is_admin'] = user.is_admin
# Redirecionar para home
return redirect(url_for("home"))
finally:
db.close()
return render_template("login.html")
# Rota de logout
@app.route("/logout")
@login_required
def logout():
db = get_db_connection()
try:
user = current_user
if user:
user.logout()
db.commit()
logout_user()
finally:
db.close()
flash('Logout realizado com sucesso!', 'success')
return redirect(url_for('login'))
# Rota home (protegida)
@app.route("/home")
@require_login
def home():
"""Página inicial do sistema com dashboard"""
db = get_db_connection()
try:
# Buscar nome do usuário
usuario = db.query(Usuario).get(session.get('user_id'))
nome_usuario = usuario.username if usuario else "Usuário"
# Formatar data atual em português
data_atual = datetime.now().strftime("%d de %B de %Y")
# Buscar dados para o dashboard
total_militantes = db.query(Militante).count()
total_cotas = db.query(func.sum(CotaMensal.valor_novo)).scalar() or 0
total_materiais = db.query(MaterialVendido).count()
total_assinaturas = db.query(AssinaturaAnual).count()
# Buscar últimos militantes cadastrados
ultimos_militantes = db.query(Militante)\
.order_by(Militante.id.desc())\
.limit(5)\
.all()
# Buscar últimos pagamentos
ultimos_pagamentos = db.query(Pagamento)\
.join(Militante)\
.order_by(Pagamento.data_pagamento.desc())\
.limit(5)\
.all()
# Buscar tipos de pagamento
tipos_pagamento = db.query(TipoPagamento).all()
return render_template('home.html',
nome_usuario=nome_usuario,
data_atual=data_atual,
total_militantes=total_militantes,
total_cotas="{:.2f}".format(total_cotas),
total_materiais=total_materiais,
total_assinaturas=total_assinaturas,
ultimos_militantes=ultimos_militantes,
ultimos_pagamentos=ultimos_pagamentos,
tipos_pagamento=tipos_pagamento)
except Exception as e:
print(f"Erro na página inicial: {e}")
import traceback
traceback.print_exc()
flash('Erro ao carregar a página inicial', 'danger')
return render_template('home.html',
nome_usuario="Usuário",
data_atual=datetime.now().strftime("%d/%m/%Y"),
total_militantes=0,
total_cotas="0.00",
total_materiais=0,
total_assinaturas=0,
ultimos_militantes=[],
ultimos_pagamentos=[])
finally:
db.close()
# Rota para criar um novo militante
@app.route("/militantes/criar", methods=["POST"])
@require_login
@require_permission('gerenciar_militantes')
def criar_militante():
"""Cria um novo militante"""
db = get_db_connection()
try:
# Validar CPF
cpf = request.form.get('cpf')
if not validar_cpf(cpf):
return jsonify({
'status': 'error',
'message': 'CPF inválido'
}), 400
# Verificar se já existe militante com este CPF
militante_existente = db.query(Militante).filter(Militante.cpf == cpf).first()
if militante_existente:
return jsonify({
'status': 'error',
'message': 'CPF já cadastrado'
}), 400
# Criar endereço
endereco = Endereco(
cep=request.form.get('cep'),
estado=request.form.get('estado'),
cidade=request.form.get('cidade'),
bairro=request.form.get('bairro'),
logradouro=request.form.get('logradouro'),
numero=request.form.get('numero'),
complemento=request.form.get('complemento')
)
db.add(endereco)
db.flush() # Gerar ID do endereço
# Criar militante
militante = Militante(
# Dados Básicos
nome=request.form.get('nome'),
cpf=cpf,
titulo_eleitoral=request.form.get('titulo_eleitoral'),
data_nascimento=datetime.strptime(request.form.get('data_nascimento'), '%Y-%m-%d') if request.form.get('data_nascimento') else None,
data_entrada_oci=datetime.strptime(request.form.get('data_entrada_oci'), '%Y-%m-%d') if request.form.get('data_entrada_oci') else None,
data_efetivacao_oci=datetime.strptime(request.form.get('data_efetivacao_oci'), '%Y-%m-%d') if request.form.get('data_efetivacao_oci') else None,
# Contato
telefone1=request.form.get('telefone1'),
telefone2=request.form.get('telefone2'),
endereco_id=endereco.id,
# Profissional
profissao=request.form.get('profissao'),
regime_trabalho=request.form.get('regime_trabalho'),
empresa=request.form.get('empresa'),
contratante=request.form.get('contratante'),
# Acadêmico
instituicao_ensino=request.form.get('instituicao_ensino'),
tipo_instituicao=request.form.get('tipo_instituicao'),
# Sindical
sindicato=request.form.get('sindicato'),
cargo_sindical=request.form.get('cargo_sindical'),
central_sindical=request.form.get('central_sindical'),
dirigente_sindical=request.form.get('dirigente_sindical') == 'on',
# Organização
estado=EstadoMilitante(request.form.get('estado', 'ATIVO')),
celula_id=request.form.get('celula_id', type=int),
responsabilidades=request.form.get('responsabilidades', type=int, default=0),
# Por padrão, todo novo militante é aspirante
aspirante=True,
data_inicio_aspirante=datetime.now()
)
db.add(militante)
db.flush() # Gerar ID do militante
# Criar email principal
email = EmailMilitante(
email=request.form.get('email'),
principal=True,
militante_id=militante.id
)
db.add(email)
db.commit()
return jsonify({
'status': 'success',
'message': 'Militante criado com sucesso!'
})
except Exception as e:
db.rollback()
return jsonify({
'status': 'error',
'message': f'Erro ao criar militante: {str(e)}'
}), 500
finally:
db.close()
# Rota para listar militantes
@app.route("/militantes")
@require_login
@require_permission(Permission.MANAGE_CELL_MEMBERS)
def listar_militantes():
db = get_db_connection()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
celulas = db.query(Celula).order_by(Celula.nome).all()
return render_template('listar_militantes.html', militantes=militantes, Militante=Militante, celulas=celulas)
finally:
db.close()
# Rota para excluir militante
@app.route("/militantes/excluir/<int:id>", methods=["POST"])
@login_required
@session_timeout
def excluir_militante(id):
try:
db = get_db_connection()
militante = db.query(Militante).get(id)
if not militante:
flash('Militante não encontrado', 'danger')
return redirect(url_for('listar_militantes'))
db.delete(militante)
db.commit()
flash('Militante excluído com sucesso!', 'success')
except Exception as e:
db.rollback()
print(f"Erro ao excluir militante: {e}")
flash('Erro ao excluir militante', 'danger')
finally:
db.close()
return redirect(url_for('listar_militantes'))
# Rota para criar uma nova cota
@app.route("/cotas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def nova_cota():
if request.method == "POST":
militante_id = request.form.get("militante_id")
valor_antigo = float(request.form.get("valor_antigo"))
valor_novo = float(request.form.get("valor_novo"))
data_alteracao = datetime.strptime(request.form.get("data_alteracao"), "%Y-%m-%d").date()
data_vencimento = datetime.strptime(request.form.get("data_vencimento"), "%Y-%m-%d").date()
db = get_db_connection()
try:
cotas_mensais = CotaMensal(
militante_id=militante_id,
valor_antigo=valor_antigo,
valor_novo=valor_novo,
data_alteracao=data_alteracao,
data_vencimento=data_vencimento,
pago=False
)
db.add(cotas_mensais)
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Cota cadastrada com sucesso!'
})
flash('Cota cadastrada com sucesso!', 'success')
return redirect(url_for('listar_cotas'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar cota: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao cadastrar cota. Verifique os dados e tente novamente.'
}), 400
flash('Erro ao cadastrar cota', 'danger')
return render_template("nova_cota.html")
finally:
db.close()
db = get_db_connection()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
return render_template("nova_cota.html", militantes=militantes)
finally:
db.close()
# Rota para listar cotas
@app.route("/cotas")
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def listar_cotas():
try:
print("Buscando cotas...")
db = get_db_connection()
cotas = db.query(CotaMensal)\
.join(Militante)\
.order_by(CotaMensal.data_vencimento.desc())\
.all()
# Calcular status de cada cota
for cota in cotas:
if cota.pago:
cota.status = "paga"
elif cota.data_vencimento < datetime.now().date():
cota.status = "atrasada"
else:
cota.status = "pendente"
# Buscar militantes para o modal de nova cota
militantes = db.query(Militante).order_by(Militante.nome).all()
return render_template("listar_cotas.html", cotas=cotas, militantes=militantes)
except Exception as e:
print(f"Erro ao listar cotas: {e}")
flash('Erro ao listar cotas', 'danger')
return render_template("listar_cotas.html", cotas=[], militantes=[])
finally:
db.close()
# Rota para editar cota
@app.route('/cotas/editar/<int:id>', methods=['GET', 'POST'])
@login_required
@session_timeout
def editar_cota(id):
db = get_db_connection()
try:
cota = db.query(CotaMensal).get_or_404(id)
if request.method == 'POST':
try:
print("Dados recebidos:", request.form)
cota.militante_id = int(request.form['militante_id'])
cota.valor_antigo = float(request.form['valor_antigo'])
cota.valor_novo = float(request.form['valor_novo'])
cota.data_alteracao = datetime.strptime(request.form['data_alteracao'], '%Y-%m-%d').date()
cota.data_vencimento = datetime.strptime(request.form['data_vencimento'], '%Y-%m-%d').date()
# Processar o campo pago
pago = request.form.get('pago', '').lower()
print("Valor do campo pago recebido:", pago)
cota.pago = pago == 'true'
print("Status final do pago:", cota.pago)
db.commit()
print("Commit realizado com sucesso")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Cota atualizada com sucesso!'
})
flash('Cota atualizada com sucesso!', 'success')
return redirect(url_for('listar_cotas'))
except Exception as e:
db.rollback()
print(f"Erro ao atualizar cota: {str(e)}")
print(f"Tipo do erro: {type(e)}")
import traceback
traceback.print_exc()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': f'Erro ao atualizar cota: {str(e)}'
}), 400
flash('Erro ao atualizar cota. Verifique os dados e tente novamente.', 'danger')
return redirect(url_for('editar_cota', id=id))
return render_template('editar_cota.html', cota=cota)
except Exception as e:
print(f"Erro ao carregar cota: {str(e)}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': f'Erro ao carregar cota: {str(e)}'
}), 404
flash('Erro ao carregar cota', 'danger')
return redirect(url_for('listar_cotas'))
finally:
db.close()
# Rota para criar um novo pagamento
@app.route("/pagamentos/novo", methods=["GET", "POST"])
@require_login
def novo_pagamento():
if request.method == "POST":
militante_id = request.form.get("militante_id")
tipo_pagamento_id = request.form.get("tipo_pagamento_id")
valor = float(request.form.get("valor"))
data_pagamento = datetime.strptime(request.form.get("data_pagamento"), "%Y-%m-%d").date()
# Criar novo pagamento
db = get_db_connection()
try:
pagamento = Pagamento(
militante_id=militante_id,
tipo_pagamento_id=tipo_pagamento_id,
valor=valor,
data_pagamento=data_pagamento
)
db.add(pagamento)
db.commit()
flash('Pagamento cadastrado com sucesso!', 'success')
return redirect(url_for('listar_pagamentos'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar pagamento: {e}")
flash('Erro ao cadastrar pagamento', 'danger')
return render_template("novo_pagamento.html")
finally:
db.close()
# GET - Renderizar formulário
db = get_db_connection()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
tipos_pagamento = db.query(TipoPagamento).order_by(TipoPagamento.descricao).all()
return render_template("novo_pagamento.html", militantes=militantes, tipos_pagamento=tipos_pagamento)
finally:
db.close()
# Rota para listar pagamentos
@app.route("/pagamentos")
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def listar_pagamentos():
try:
print("Buscando pagamentos...")
db = get_db_connection()
pagamentos = db.query(Pagamento)\
.join(Militante)\
.order_by(Pagamento.data_pagamento.desc())\
.all()
militantes = db.query(Militante).order_by(Militante.nome).all()
tipos_pagamento = db.query(TipoPagamento).all()
return render_template("listar_pagamentos.html",
pagamentos=pagamentos,
militantes=militantes,
tipos_pagamento=tipos_pagamento)
except Exception as e:
print(f"Erro ao listar pagamentos: {e}")
flash('Erro ao listar pagamentos', 'danger')
return render_template("listar_pagamentos.html", pagamentos=[], militantes=[])
finally:
db.close()
# Rota para adicionar pagamento
@app.route("/pagamentos/adicionar", methods=["POST"])
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def adicionar_pagamento():
if request.method == "POST":
try:
militante_id = request.form.get("militante_id")
tipo_pagamento = request.form.get("tipo_pagamento")
valor = float(request.form.get("valor"))
data_pagamento = datetime.strptime(request.form.get("data_pagamento"), "%Y-%m-%d").date()
db = get_db_connection()
pagamento = Pagamento(
militante_id=militante_id,
tipo_pagamento=tipo_pagamento,
valor=valor,
data_pagamento=data_pagamento
)
db.add(pagamento)
db.commit()
flash('Pagamento adicionado com sucesso!', 'success')
except Exception as e:
db.rollback()
flash(f'Erro ao adicionar pagamento: {str(e)}', 'danger')
finally:
db.close()
return redirect(url_for('listar_pagamentos'))
# Rota para criar um novo material vendido
@app.route("/materiais/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_material():
db = get_db_connection()
try:
militante_id = request.form.get('militante_id')
tipo_material_id = request.form.get('tipo_material_id')
descricao = request.form.get('descricao')
valor = float(request.form.get('valor'))
data_venda = datetime.strptime(request.form.get('data_venda'), '%Y-%m-%d')
material = MaterialVendido(
militante_id=militante_id,
tipo_material_id=tipo_material_id,
descricao=descricao,
valor=valor,
data_venda=data_venda
)
db.add(material)
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Material cadastrado com sucesso!'
})
flash('Material cadastrado com sucesso!', 'success')
return redirect(url_for('listar_materiais'))
except Exception as e:
db.rollback()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao cadastrar material. Por favor, tente novamente.'
}), 400
flash('Erro ao cadastrar material. Por favor, tente novamente.', 'error')
return redirect(url_for('listar_materiais'))
finally:
db.close()
# Rota para listar materiais vendidos
@app.route("/materiais")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_materiais():
db = get_db_connection()
try:
materiais = db.query(MaterialVendido).join(Militante).join(TipoMaterial).all()
militantes = db.query(Militante).all()
tipos_material = db.query(TipoMaterial).all()
return render_template('listar_materiais.html',
materiais=materiais,
militantes=militantes,
tipos_material=tipos_material)
finally:
db.close()
# Rota para criar uma nova venda de jornal
@app.route("/jornais/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def nova_venda_jornal():
db = get_db_connection()
try:
militante_id = request.form.get('militante_id')
quantidade = int(request.form.get('quantidade'))
valor_total = float(request.form.get('valor_total'))
data_venda = datetime.strptime(request.form.get('data_venda'), '%Y-%m-%d')
venda = VendaJornalAvulso(
militante_id=militante_id,
quantidade=quantidade,
valor_total=valor_total,
data_venda=data_venda
)
db.add(venda)
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Venda cadastrada com sucesso!'
})
flash('Venda cadastrada com sucesso!', 'success')
return redirect(url_for('listar_vendas_jornal'))
except Exception as e:
db.rollback()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao cadastrar venda. Por favor, tente novamente.'
}), 400
flash('Erro ao cadastrar venda. Por favor, tente novamente.', 'error')
return redirect(url_for('listar_vendas_jornal'))
finally:
db.close()
# Rota para listar vendas de jornal
@app.route("/jornais")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_vendas_jornal():
db = get_db_connection()
try:
vendas = db.query(VendaJornalAvulso).join(Militante).all()
militantes = db.query(Militante).all()
return render_template('listar_vendas_jornal.html',
vendas=vendas,
militantes=militantes)
finally:
db.close()
# Rota para criar um novo relatório de cotas
@app.route("/relatorios/cotas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_relatorio_cotas():
if request.method == "POST":
setor_id = request.form.get("setor_id")
comite_id = request.form.get("comite_id")
total_cotas = float(request.form.get("total_cotas"))
data_relatorio = datetime.strptime(request.form.get("data_relatorio"), "%Y-%m-%d").date()
db = get_db_connection()
try:
relatorio_cotas_mensais = RelatorioCotasMensais(
setor_id=setor_id,
comite_id=comite_id,
total_cotas=total_cotas,
data_relatorio=data_relatorio
)
db.add(relatorio_cotas_mensais)
db.commit()
flash('Relatório de cotas cadastrado com sucesso!', 'success')
return redirect(url_for('listar_relatorios_cotas'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar relatório de cotas: {e}")
flash('Erro ao cadastrar relatório de cotas', 'danger')
return render_template("novo_relatorio_cotas.html")
finally:
db.close()
db = get_db_connection()
try:
setores = db.query(Setor).order_by(Setor.nome).all()
comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all()
return render_template("novo_relatorio_cotas.html", setores=setores, comites=comites)
finally:
db.close()
# Rota para listar relatórios de cotas
@app.route("/relatorios/cotas")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_relatorios_cotas():
try:
db = get_db_connection()
relatorios = db.query(RelatorioCotasMensais).order_by(RelatorioCotasMensais.data_relatorio.desc()).all()
return render_template("listar_relatorios_cotas.html", relatorios=relatorios)
except Exception as e:
print(f"Erro ao listar relatórios de cotas: {e}")
flash('Erro ao listar relatórios de cotas', 'danger')
return render_template("listar_relatorios_cotas.html", relatorios=[])
finally:
db.close()
# Rota para criar um novo relatório de vendas
@app.route("/relatorios/vendas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_relatorio_vendas():
if request.method == "POST":
setor_id = request.form.get("setor_id")
comite_id = request.form.get("comite_id")
total_vendas = float(request.form.get("total_vendas"))
data_relatorio = datetime.strptime(request.form.get("data_relatorio"), "%Y-%m-%d").date()
db = get_db_connection()
try:
relatorio_vendas_materiais = RelatorioVendasMateriais(
setor_id=setor_id,
comite_id=comite_id,
total_vendas=total_vendas,
data_relatorio=data_relatorio
)
db.add(relatorio_vendas_materiais)
db.commit()
flash('Relatório de vendas cadastrado com sucesso!', 'success')
return redirect(url_for('listar_relatorios_vendas'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar relatório de vendas: {e}")
flash('Erro ao cadastrar relatório de vendas', 'danger')
return render_template("novo_relatorio_vendas.html")
finally:
db.close()
db = get_db_connection()
try:
setores = db.query(Setor).order_by(Setor.nome).all()
comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all()
return render_template("novo_relatorio_vendas.html", setores=setores, comites=comites)
finally:
db.close()
# Rota para listar relatórios de vendas
@app.route("/relatorios/vendas")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_relatorios_vendas():
try:
db = get_db_connection()
relatorios = db.query(RelatorioVendasMateriais).order_by(RelatorioVendasMateriais.data_relatorio.desc()).all()
return render_template("listar_relatorios_vendas.html", relatorios=relatorios)
except Exception as e:
print(f"Erro ao listar relatórios de vendas: {e}")
flash('Erro ao listar relatórios de vendas', 'danger')
return render_template("listar_relatorios_vendas.html", relatorios=[])
finally:
db.close()
# Rota para editar militante
@app.route("/militantes/editar/<int:militante_id>", methods=["POST"])
@require_login
@require_permission('gerenciar_militantes')
def editar_militante(militante_id):
"""Edita um militante existente"""
print(f"Iniciando edição do militante {militante_id}")
print(f"Dados recebidos: {request.form}")
if request.method == "POST":
db = get_db_connection()
try:
militante = db.query(Militante).options(
joinedload(Militante.endereco),
joinedload(Militante.emails)
).get(militante_id)
if not militante:
print(f"Militante {militante_id} não encontrado")
return jsonify({
'status': 'error',
'message': 'Militante não encontrado'
}), 404
print(f"Militante encontrado: {militante.nome}")
# Dados Básicos
militante.nome = request.form.get('nome')
militante.cpf = request.form.get('cpf')
militante.titulo_eleitoral = request.form.get('titulo_eleitoral')
militante.data_nascimento = datetime.strptime(request.form.get('data_nascimento'), '%Y-%m-%d') if request.form.get('data_nascimento') else None
militante.data_entrada_oci = datetime.strptime(request.form.get('data_entrada_oci'), '%Y-%m-%d') if request.form.get('data_entrada_oci') else None
militante.data_efetivacao_oci = datetime.strptime(request.form.get('data_efetivacao_oci'), '%Y-%m-%d') if request.form.get('data_efetivacao_oci') else None
print("Dados básicos atualizados")
# Contato
militante.telefone1 = request.form.get('telefone1')
militante.telefone2 = request.form.get('telefone2')
# Email (atualizar o principal)
email = request.form.get('email')
if email:
if militante.emails:
militante.emails[0].endereco_email = email
else:
novo_email = EmailMilitante(
endereco_email=email,
militante_id=militante.id,
principal=True
)
db.add(novo_email)
print("Dados de contato atualizados")
# Endereço
if not militante.endereco:
militante.endereco = Endereco()
db.add(militante.endereco)
militante.endereco.cep = request.form.get('cep')
militante.endereco.estado = request.form.get('estado')
militante.endereco.cidade = request.form.get('cidade')
militante.endereco.bairro = request.form.get('bairro')
militante.endereco.rua = request.form.get('rua')
militante.endereco.numero = request.form.get('numero')
militante.endereco.complemento = request.form.get('complemento')
print("Dados de endereço atualizados")
# Profissional
militante.profissao = request.form.get('profissao')
militante.regime_trabalho = request.form.get('regime_trabalho')
militante.empresa = request.form.get('empresa')
militante.contratante = request.form.get('contratante')
# Acadêmico
militante.instituicao_ensino = request.form.get('instituicao_ensino')
militante.tipo_instituicao = request.form.get('tipo_instituicao')
# Sindical
militante.sindicato = request.form.get('sindicato')
militante.cargo_sindical = request.form.get('cargo_sindical')
militante.central_sindical = request.form.get('central_sindical')
militante.dirigente_sindical = request.form.get('dirigente_sindical') == 'on'
print("Dados profissionais e sindicais atualizados")
# Organização
estado_str = request.form.get('estado', 'ATIVO').lower()
print(f"Estado recebido: {estado_str}")
# Converter o estado para o enum
try:
militante.estado = EstadoMilitante[estado_str.upper()]
print(f"Estado convertido: {militante.estado}")
except (KeyError, ValueError) as e:
print(f"Erro ao converter estado: {str(e)}")
return jsonify({
'status': 'error',
'message': f'Estado inválido: {estado_str}'
}), 400
# Tratar celula_id corretamente
celula_id = request.form.get('celula_id')
if celula_id:
try:
militante.celula_id = int(celula_id)
except (ValueError, TypeError):
militante.celula_id = None
else:
militante.celula_id = None
# Tratar responsabilidades corretamente
try:
responsabilidades_json = request.form.get('responsabilidades')
if responsabilidades_json:
responsabilidades_lista = json.loads(responsabilidades_json)
valor_responsabilidades = 0
if 'Finanças' in responsabilidades_lista:
valor_responsabilidades |= Militante.RESPONSAVEL_FINANCAS
if 'Imprensa' in responsabilidades_lista:
valor_responsabilidades |= Militante.RESPONSAVEL_IMPRENSA
if 'Quadro-Orientador' in responsabilidades_lista:
valor_responsabilidades |= Militante.QUADRO_ORIENTADOR
militante.responsabilidades = valor_responsabilidades
else:
militante.responsabilidades = 0
except (ValueError, TypeError, json.JSONDecodeError) as e:
print(f"Erro ao processar responsabilidades: {str(e)}")
militante.responsabilidades = 0
print("Dados organizacionais atualizados")
# Se o estado mudou para DESLIGADO, registrar data e motivo
if militante.estado == EstadoMilitante.DESLIGADO:
militante.data_desligamento = datetime.now()
militante.motivo_desligamento = request.form.get('motivo_desligamento')
db.commit()
print("Alterações salvas com sucesso")
return jsonify({
'status': 'success',
'message': f'Militante {militante.nome} atualizado com sucesso!'
})
except Exception as e:
print(f"Erro ao atualizar militante: {str(e)}")
db.rollback()
return jsonify({
'status': 'error',
'message': f'Erro ao atualizar militante: {str(e)}'
}), 500
finally:
db.close()
# Rota para criar um novo usuário
@app.route("/usuarios/novo", methods=["GET", "POST"])
@login_required
def novo_usuario():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
email = request.form.get("email")
role_id = request.form.get("role_id")
setor_id = request.form.get("setor_id")
# Verificar se usuário já existe
db = get_db_connection()
try:
if db.query(Usuario).filter_by(username=username).first():
flash('Nome de usuário já existe.', 'danger')
return render_template("novo_usuario.html")
novo_usuario = Usuario(
username=username,
email=email,
role_id=role_id,
setor_id=setor_id
)
novo_usuario.set_password(password)
novo_usuario.otp_secret = pyotp.random_base32()
db.add(novo_usuario)
db.commit()
flash('Usuário cadastrado com sucesso!', 'success')
return redirect(url_for('listar_usuarios'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar usuário: {e}")
flash('Erro ao cadastrar usuário', 'danger')
return render_template("novo_usuario.html")
finally:
db.close()
db = get_db_connection()
try:
roles = db.query(Role).order_by(Role.nome).all()
setores = db.query(Setor).order_by(Setor.nome).all()
return render_template("novo_usuario.html", roles=roles, setores=setores)
finally:
db.close()
# Rota para verificar status da sessão
@app.route('/check_session')
def check_session():
if 'user_id' not in session:
return jsonify({'status': 'expired'})
if 'last_activity' in session:
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
if now - last_activity > timedelta(hours=2):
# Registrar o logout por timeout
try:
db = get_db_connection()
user = db.query(Usuario).get(session.get('user_id'))
if user:
user.ultimo_logout = datetime.now()
user.motivo_logout = "Timeout de sessão"
db.commit()
db.close()
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
return jsonify({'status': 'expired'})
return jsonify({'status': 'active'})
@app.route("/qr/<token>")
def get_qr_code(token):
db = get_db_connection()
try:
user = db.query(Usuario).filter_by(username='admin').first()
if not user:
flash('Usuário não encontrado', 'error')
return redirect(url_for('login'))
qr_uri = user.get_otp_uri()
return render_template('mostrar_qr_code.html', qr_uri=qr_uri)
finally:
db.close()
# Adicionar nova rota para API de setores
@app.route("/api/setores/<int:cr_id>")
@require_login
def get_setores(cr_id):
setores = db_session.query(Setor).filter_by(cr_id=cr_id).all()
return jsonify({
'setores': [{'id': s.id, 'nome': s.nome} for s in setores]
})
@app.route('/celulas/<int:celula_id>/militantes')
@require_login
def list_militantes_celula(celula_id):
@require_instance_access('celula', celula_id)
def decorated_function(celula_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(celula_id=celula_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(celula_id)
@app.route('/setores/<int:setor_id>/militantes')
@require_login
def list_militantes_setor(setor_id):
@require_instance_access('setor', setor_id)
def decorated_function(setor_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(setor_id=setor_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(setor_id)
@app.route('/crs/<int:cr_id>/militantes')
@require_login
def list_militantes_cr(cr_id):
@require_instance_access('cr', cr_id)
def decorated_function(cr_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(cr_id=cr_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(cr_id)
@app.route('/celulas/<int:celula_id>/pagamentos')
@require_login
def list_pagamentos_celula(celula_id):
@require_instance_access('celula', celula_id)
def decorated_function(celula_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).filter_by(celula_id=celula_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(celula_id)
@app.route('/setores/<int:setor_id>/pagamentos')
@require_login
def list_pagamentos_setor(setor_id):
@require_instance_access('setor', setor_id)
def decorated_function(setor_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(setor_id)
@app.route('/crs/<int:cr_id>/pagamentos')
@require_login
def list_pagamentos_cr(cr_id):
@require_instance_access('cr', cr_id)
def decorated_function(cr_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(cr_id)
@app.route('/celulas/<int:celula_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_CELL_PAYMENT', 'celula_id')
def novo_pagamento_celula(celula_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
celula_id=celula_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_celula', celula_id=celula_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route('/setores/<int:setor_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_SECTOR_PAYMENT', 'setor_id')
def novo_pagamento_setor(setor_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
setor_id=setor_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_setor', setor_id=setor_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route('/crs/<int:cr_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_CR_PAYMENT', 'cr_id')
def novo_pagamento_cr(cr_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
cr_id=cr_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_cr', cr_id=cr_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route("/alterar_senha", methods=["GET", "POST"])
@require_login
def alterar_senha():
"""Rota para alterar a senha do usuário"""
if request.method == "POST":
senha_atual = request.form.get("senha_atual")
nova_senha = request.form.get("nova_senha")
confirmar_senha = request.form.get("confirmar_senha")
if not all([senha_atual, nova_senha, confirmar_senha]):
flash("Todos os campos são obrigatórios.", "error")
return redirect(url_for("alterar_senha"))
if nova_senha != confirmar_senha:
flash("As senhas não coincidem.", "error")
return redirect(url_for("alterar_senha"))
db = get_db_connection()
try:
user = db.query(Usuario).get(current_user.id)
if not user.check_password(senha_atual):
flash("Senha atual incorreta.", "error")
return redirect(url_for("alterar_senha"))
user.password_hash = generate_password_hash(nova_senha)
db.commit()
flash("Senha alterada com sucesso!", "success")
return redirect(url_for("home"))
finally:
db.close()
return render_template("alterar_senha.html")
@app.route('/usuarios/<int:user_id>/toggle_status', methods=['POST'])
@require_login
def toggle_user_status(user_id):
user = db_session.query(Usuario).get_or_404(user_id)
# Verificar permissões baseado na hierarquia
if not current_user.has_permission('system_config'):
if current_user.has_permission('manage_cr_sectors'):
# Secretário de CR só pode gerenciar membros do seu CR
if user.cr_id != current_user.cr_id:
flash('Você não tem permissão para gerenciar este usuário.', 'danger')
return redirect(url_for('dashboard_admin'))
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor só pode gerenciar membros do seu setor
if user.setor_id != current_user.setor_id:
flash('Você não tem permissão para gerenciar este usuário.', 'danger')
return redirect(url_for('dashboard_admin'))
elif current_user.has_permission('manage_cell_members'):
# Secretário de Célula só pode gerenciar membros da sua célula
if user.celula_id != current_user.celula_id:
flash('Você não tem permissão para gerenciar este usuário.', 'danger')
return redirect(url_for('dashboard_admin'))
else:
# Militante básico não pode gerenciar ninguém
flash('Você não tem permissão para gerenciar usuários.', 'danger')
return redirect(url_for('dashboard_admin'))
user.ativo = not user.ativo
db_session.commit()
return jsonify({'success': True, 'message': 'Status do usuário alterado com sucesso!'})
@app.route('/usuarios/<int:user_id>/alterar_nivel', methods=['POST'])
@require_login
def alterar_nivel(user_id):
user = db_session.query(Usuario).get_or_404(user_id)
novo_nivel = request.json.get('nivel')
# Verificar permissões baseado na hierarquia
if not current_user.has_permission('system_config'):
if current_user.has_permission('manage_cr_sectors'):
# Secretário de CR só pode alterar níveis dentro do seu CR
if user.cr_id != current_user.cr_id:
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'})
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor só pode alterar níveis dentro do seu setor
if user.setor_id != current_user.setor_id:
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'})
else:
# Outros níveis não podem alterar níveis
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar níveis de usuários.'})
# Verificar se o novo nível é válido para o nível hierárquico do usuário atual
if current_user.has_permission('system_config'):
# Secretário Geral e Secretário de Organização podem alterar para qualquer nível
pass
elif current_user.has_permission('manage_cr_sectors'):
# Secretário de CR só pode alterar para níveis do CR
if novo_nivel not in ['membro_cr', 'secretario_cr']:
return jsonify({'success': False, 'message': 'Nível inválido para este CR.'})
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor só pode alterar para níveis do setor
if novo_nivel not in ['membro_setor', 'secretario_setor']:
return jsonify({'success': False, 'message': 'Nível inválido para este setor.'})
# Atualizar o nível do usuário
user.role = novo_nivel
db_session.commit()
return jsonify({'success': True, 'message': 'Nível do usuário alterado com sucesso!'})
@app.route('/usuarios/<int:user_id>/toggle_quadro_orientador', methods=['POST'])
@require_login
def toggle_quadro_orientador(user_id):
db = get_db_connection()
try:
user = db.query(Usuario).get(user_id)
if not user:
return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404
# Verificar permissões
if not (current_user.has_permission('system_config') or
(current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or
(current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)):
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar esta responsabilidade'}), 403
# Verificar se o usuário tem um militante associado
if not user.militante:
return jsonify({'success': False, 'message': 'Usuário não tem um militante associado'}), 400
# Alternar o status de Quadro-Orientador
user.militante.quadro_orientador = not user.militante.quadro_orientador
# Atualizar a responsabilidade no campo responsabilidades
if user.militante.quadro_orientador:
user.militante.responsabilidades |= Militante.QUADRO_ORIENTADOR
else:
user.militante.responsabilidades &= ~Militante.QUADRO_ORIENTADOR
db.commit()
return jsonify({
'success': True,
'message': f'Responsabilidade de Quadro-Orientador {"adicionada" if user.militante.quadro_orientador else "removida"} com sucesso'
})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db.close()
@app.route("/cotas/excluir/<int:id>", methods=["POST"])
@login_required
@session_timeout
def excluir_cota(id):
"""Exclui uma cota mensal"""
db = get_db_connection()
try:
cota = db.query(CotaMensal).get(id)
if not cota:
flash('Cota não encontrada.', 'danger')
return redirect(url_for('listar_cotas'))
# Excluir a cota
db.delete(cota)
db.commit()
flash('Cota excluída com sucesso!', 'success')
except Exception as e:
db.rollback()
flash('Erro ao excluir cota. Por favor, tente novamente.', 'danger')
print(f"Erro ao excluir cota: {e}")
finally:
db.close()
return redirect(url_for('listar_cotas'))
@app.route("/assinaturas")
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def listar_assinaturas():
db = get_db_connection()
try:
assinaturas = db.query(AssinaturaAnual).join(Militante).all()
militantes = db.query(Militante).all()
return render_template('listar_assinaturas.html',
assinaturas=assinaturas,
militantes=militantes)
finally:
db.close()
@app.route("/assinaturas/novo", methods=['POST'])
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def nova_assinatura():
db = get_db_connection()
try:
data = request.form
# Validar dados
if not all(k in data for k in ['militante_id', 'data_inicio', 'data_fim', 'valor']):
return jsonify({'success': False, 'message': 'Todos os campos são obrigatórios'})
# Criar nova assinatura
assinatura = AssinaturaAnual(
militante_id=data['militante_id'],
data_inicio=datetime.strptime(data['data_inicio'], '%Y-%m-%d'),
data_fim=datetime.strptime(data['data_fim'], '%Y-%m-%d'),
valor=float(data['valor'])
)
db.add(assinatura)
db.commit()
return jsonify({'success': True})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)})
finally:
db.close()
@app.route("/assinaturas/excluir/<int:id>", methods=['POST'])
@require_login
@require_permission(Permission.MANAGE_CELL_REPORTS)
def excluir_assinatura(id):
db = get_db_connection()
try:
assinatura = db.query(AssinaturaAnual).get(id)
if not assinatura:
return jsonify({'success': False, 'message': 'Assinatura não encontrada'})
db.delete(assinatura)
db.commit()
return jsonify({'success': True})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)})
finally:
db.close()
@app.route("/militantes/dados/<int:militante_id>")
@require_login
@require_permission('gerenciar_militantes')
def buscar_dados_militante(militante_id):
"""Retorna os dados de um militante"""
db = get_db_connection()
try:
militante = db.query(Militante).options(
joinedload(Militante.endereco),
joinedload(Militante.emails)
).get(militante_id)
if not militante:
return jsonify({
'status': 'error',
'message': 'Militante não encontrado'
}), 404
# Converter responsabilidades para lista de strings
responsabilidades = []
if militante.responsabilidades & Militante.RESPONSAVEL_FINANCAS:
responsabilidades.append('Finanças')
if militante.responsabilidades & Militante.RESPONSAVEL_IMPRENSA:
responsabilidades.append('Imprensa')
if militante.responsabilidades & Militante.QUADRO_ORIENTADOR:
responsabilidades.append('Quadro-Orientador')
return jsonify({
'nome': militante.nome,
'cpf': militante.cpf,
'titulo_eleitoral': militante.titulo_eleitoral,
'data_nascimento': militante.data_nascimento.strftime('%Y-%m-%d') if militante.data_nascimento else None,
'data_entrada_oci': militante.data_entrada_oci.strftime('%Y-%m-%d') if militante.data_entrada_oci else None,
'data_efetivacao_oci': militante.data_efetivacao_oci.strftime('%Y-%m-%d') if militante.data_efetivacao_oci else None,
'telefone1': militante.telefone1,
'telefone2': militante.telefone2,
'email': militante.emails[0].endereco_email if militante.emails else None,
'endereco': {
'cep': militante.endereco.cep if militante.endereco else None,
'estado': militante.endereco.estado if militante.endereco else None,
'cidade': militante.endereco.cidade if militante.endereco else None,
'bairro': militante.endereco.bairro if militante.endereco else None,
'rua': militante.endereco.rua if militante.endereco else None,
'numero': militante.endereco.numero if militante.endereco else None,
'complemento': militante.endereco.complemento if militante.endereco else None
} if militante.endereco else None,
'profissao': militante.profissao,
'regime_trabalho': militante.regime_trabalho,
'empresa': militante.empresa,
'contratante': militante.contratante,
'instituicao_ensino': militante.instituicao_ensino,
'tipo_instituicao': militante.tipo_instituicao,
'sindicato': militante.sindicato,
'cargo_sindical': militante.cargo_sindical,
'central_sindical': militante.central_sindical,
'dirigente_sindical': militante.dirigente_sindical,
'estado': militante.estado.name if militante.estado else None,
'celula_id': militante.celula_id,
'responsabilidades': responsabilidades
})
except Exception as e:
print(f"Erro ao buscar dados do militante: {str(e)}")
return jsonify({
'status': 'error',
'message': f'Erro ao buscar dados do militante: {str(e)}'
}), 500
finally:
db.close()
def create_app():
app = Flask(__name__)
# ... existing code ...
# ... existing code ...
return app
def init_system():
"""Inicializa o sistema com todos os usuários necessários"""
print("Inicializando sistema...")
# Inicializar banco de dados
print("Inicializando banco de dados...")
init_database()
db = get_db_connection()
try:
# Verificar admin
admin = db.query(Usuario).filter_by(username='admin').first()
if admin:
print("\nAdmin configurado:")
print(f"Username: admin")
print(f"Senha: admin123")
if admin.otp_secret:
print("OTP: Usando configuração existente")
else:
admin.otp_secret = pyotp.random_base32()
db.commit()
print("OTP: Nova configuração gerada")
else:
# Criar admin se não existir
create_admin_user()
# Verificar usuários de teste
test_users = ['aligner', 'tester', 'deployer']
for username in test_users:
user = db.query(Usuario).filter_by(username=username).first()
if user:
print(f"\nUsuário {username} configurado:")
print(f"Username: {username}")
print(f"Senha: Test123!@#")
if user.otp_secret:
print("OTP: Usando configuração existente")
else:
user.otp_secret = pyotp.random_base32()
db.commit()
print("OTP: Nova configuração gerada")
else:
# Criar usuário de teste se não existir
create_test_users()
finally:
db.close()
print("\nInstruções:")
print("1. Configure o OTP usando o QR code gerado (apenas para novos usuários)")
print("2. Faça login com as credenciais fornecidas")
print("3. Altere a senha no primeiro login")
print("\nCredenciais:")
print("Admin:")
print("Username: admin")
print("Senha: admin123")
print("\nUsuários de teste:")
print("Username: aligner, tester, deployer")
print("Senha: Test123!@#")
if __name__ == '__main__':
init_system()
app.run(
host='0.0.0.0',
port=5000,
debug=os.getenv('FLASK_ENV') == 'development'
)