Files
controles/app.py

1654 lines
63 KiB
Python
Raw Normal View History

from flask import Flask, request, render_template, redirect, url_for, flash, session, jsonify, send_file
2025-01-08 00:19:49 -03:00
from functions.database import (
Base,
Militante,
CotaMensal,
TipoPagamento,
Pagamento,
MaterialVendido,
TipoMaterial,
VendaJornalAvulso,
engine,
AssinaturaAnual,
RelatorioCotasMensais,
RelatorioVendasMateriais,
Usuario,
get_db_connection,
ComiteRegional,
2025-04-03 11:24:47 -03:00
Setor,
Celula,
ComiteCentral,
EmailMilitante,
init_database,
EstadoMilitante,
2025-01-08 00:19:49 -03:00
)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, joinedload
from datetime import datetime, timedelta
from flask_bootstrap import Bootstrap5
from functions.validations import validar_cpf
from functools import wraps
from pathlib import Path
from time import time
from flask_mail import Mail, Message
2025-04-03 11:24:47 -03:00
import os
from dotenv import load_dotenv
from functions.rbac import Role, Permission, init_rbac
from functions.decorators import require_permission, require_role, require_minimum_role, require_login, require_instance_permission, require_instance_access
import re
import secrets
import pyotp
import qrcode
import base64
from io import BytesIO
from create_admin import create_admin
from create_test_users import create_test_users
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import random
import string
from sqlalchemy.sql import func
2025-04-03 11:24:47 -03:00
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY', secrets.token_hex(16))
bootstrap = Bootstrap5(app)
# Configurar Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
@login_manager.user_loader
def load_user(user_id):
"""Carrega o usuário pelo ID"""
db = get_db_connection()
try:
# Carregar o usuário com suas roles
user = db.query(Usuario).options(
joinedload(Usuario.roles)
).get(user_id)
return user
finally:
db.close()
def generate_qr_code(user):
"""Gera um QR code para o usuário"""
if not user.otp_secret:
user.otp_secret = pyotp.random_base32()
totp = pyotp.TOTP(user.otp_secret)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles"))
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8')
return qr_code
# Configuração da sessão do SQLAlchemy
db_session = get_db_connection()
2025-01-08 00:19:49 -03:00
# Configurar Flask-Mail
app.config['MAIL_SERVER'] = os.getenv('MAIL_SERVER', 'smtp.gmail.com')
app.config['MAIL_PORT'] = int(os.getenv('MAIL_PORT', 587))
app.config['MAIL_USE_TLS'] = os.getenv('MAIL_USE_TLS', 'True').lower() == 'true'
app.config['MAIL_USERNAME'] = os.getenv('MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.getenv('MAIL_PASSWORD')
app.config['MAIL_DEFAULT_SENDER'] = os.getenv('MAIL_DEFAULT_SENDER')
mail = Mail(app)
# Inicializar banco de dados e RBAC
print("Inicializando banco de dados...")
init_database()
print("Inicializando sistema RBAC...")
init_rbac()
# Criar admin e usuários de teste
print("Criando usuários iniciais...")
create_admin()
create_test_users()
# Decorator para verificar se o usuário está logado
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Por favor, faça login para acessar esta página.', 'warning')
2025-03-24 14:50:42 -03:00
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
2025-01-08 00:19:49 -03:00
# Decorator para verificar se a sessão expirou
def session_timeout(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if current_user.is_authenticated:
if 'last_activity' not in session:
session['last_activity'] = time()
return f(*args, **kwargs)
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
# Se passaram mais de 30 minutos (configurável)
timeout_minutes = 30
if now - last_activity > timedelta(minutes=timeout_minutes):
# Registrar o logout por timeout
try:
current_user.ultimo_logout = datetime.now()
current_user.motivo_logout = "Timeout de sessão"
db_session.commit()
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning')
return redirect(url_for('login'))
# Atualizar timestamp de último acesso
session['last_activity'] = time()
# Atualizar também no banco de dados
try:
current_user.update_last_activity()
db_session.commit()
except Exception as e:
print(f"Erro ao atualizar última atividade: {e}")
return f(*args, **kwargs)
return f(*args, **kwargs)
return decorated_function
# Rota raiz - redireciona para login se não estiver autenticado
@app.route("/")
@require_login
def index():
"""Rota principal"""
return redirect(url_for('home'))
# Rota de login
@app.route("/login", methods=["GET", "POST"])
def login():
"""Rota de login"""
if request.method == "POST":
email = request.form.get("email")
password = request.form.get("password")
2025-03-24 14:50:42 -03:00
if not all([email, password]):
flash("Todos os campos são obrigatórios.", "error")
return redirect(url_for("login"))
db = get_db_connection()
try:
user = db.query(Usuario).filter_by(email=email).first()
2025-03-24 14:50:42 -03:00
if not user or not user.check_password(password):
flash("Email ou senha incorretos.", "error")
return redirect(url_for("login"))
# Atualizar último login
user.ultimo_login = datetime.utcnow()
db.commit()
# Fazer login
login_user(user)
# Redirecionar para home
return redirect(url_for("home"))
finally:
db.close()
return render_template("login.html")
# Rota de logout
@app.route("/logout")
@login_required
def logout():
db = get_db_connection()
try:
user = current_user
if user:
user.logout()
db.commit()
logout_user()
finally:
db.close()
flash('Logout realizado com sucesso!', 'success')
return redirect(url_for('login'))
# Rota home (protegida)
@app.route("/home")
@require_login
def home():
"""Página inicial do sistema com dashboard"""
db = get_db_connection()
try:
# Buscar nome do usuário
usuario = db.query(Usuario).get(session.get('user_id'))
nome_usuario = usuario.username if usuario else "Usuário"
# Formatar data atual em português
data_atual = datetime.now().strftime("%d de %B de %Y")
# Buscar dados para o dashboard
total_militantes = db.query(func.count(Militante.id)).scalar()
total_cotas = db.query(func.sum(CotaMensal.valor)).scalar() or 0
total_materiais = db.query(func.sum(MaterialVendido.valor)).scalar() or 0
total_assinaturas = db.query(func.sum(AssinaturaAnual.valor)).scalar() or 0
<<<<<<< HEAD
return render_template(
"home.html",
nome_usuario=nome_usuario,
data_atual=data_atual,
total_militantes=total_militantes,
total_cotas=total_cotas,
total_materiais=total_materiais,
total_assinaturas=total_assinaturas
)
=======
# Buscar últimos militantes cadastrados
ultimos_militantes = db.query(Militante)\
.order_by(Militante.id.desc())\
.limit(5)\
.all()
# Buscar últimos pagamentos
ultimos_pagamentos = db.query(Pagamento)\
.join(Militante)\
.order_by(Pagamento.data_pagamento.desc())\
.limit(5)\
.all()
# Buscar tipos de pagamento
tipos_pagamento = db.query(TipoPagamento).all()
return render_template('home.html',
nome_usuario=nome_usuario,
data_atual=data_formatada,
total_militantes=total_militantes,
total_cotas="{:.2f}".format(total_cotas),
total_materiais=total_materiais,
total_assinaturas=total_assinaturas,
ultimos_militantes=ultimos_militantes,
ultimos_pagamentos=ultimos_pagamentos,
tipos_pagamento=tipos_pagamento)
except Exception as e:
print(f"Erro na página inicial: {e}")
import traceback
traceback.print_exc()
flash('Erro ao carregar a página inicial', 'danger')
return render_template('home.html',
nome_usuario="Usuário",
data_atual=datetime.now().strftime("%d/%m/%Y"),
total_militantes=0,
total_cotas="0.00",
total_materiais=0,
total_assinaturas=0,
ultimos_militantes=[],
ultimos_pagamentos=[])
>>>>>>> 324660d (refactor: melhorias na interface e funcionalidades - Atualização do layout do dashboard com Bootstrap 5 - Remoção do template editar_pagamento.html (integrado ao modal) - Melhorias no template home.html com cards estatísticos - Ajustes nos estilos e responsividade - Correções nas rotas e conexões do banco de dados - Implementação do modal de edição de pagamentos - Adição de efeitos hover e melhorias visuais)
finally:
db.close()
# Rota para criar um novo militante
@app.route('/militantes/criar', methods=['GET', 'POST'])
@require_login
@require_permission('gerenciar_militantes')
def criar_militante():
if request.method == 'POST':
try:
nome = request.form['nome']
email = request.form['email']
cpf = request.form['cpf']
titulo_eleitoral = request.form['titulo_eleitoral']
data_nascimento = datetime.strptime(request.form['data_nascimento'], '%Y-%m-%d').date() if request.form['data_nascimento'] else None
data_entrada_oci = datetime.strptime(request.form['data_entrada_oci'], '%Y-%m-%d').date() if request.form['data_entrada_oci'] else None
data_efetivacao_oci = datetime.strptime(request.form['data_efetivacao_oci'], '%Y-%m-%d').date() if request.form['data_efetivacao_oci'] else None
telefone1 = request.form['telefone1']
telefone2 = request.form['telefone2']
profissao = request.form['profissao']
regime_trabalho = request.form['regime_trabalho']
empresa = request.form['empresa']
contratante = request.form['contratante']
instituicao_ensino = request.form['instituicao_ensino']
tipo_instituicao = request.form['tipo_instituicao']
sindicato = request.form['sindicato']
cargo_sindical = request.form['cargo_sindical']
dirigente_sindical = 'dirigente_sindical' in request.form
central_sindical = request.form['central_sindical']
setor_id = request.form['setor_id']
celula_id = request.form['celula_id']
2025-04-03 11:24:47 -03:00
# Processar responsabilidades
responsabilidades = 0
if 'responsabilidades' in request.form:
for responsabilidade in request.form.getlist('responsabilidades'):
responsabilidades |= int(responsabilidade)
2025-04-03 11:24:47 -03:00
militante = Militante(
nome=nome,
email=email,
2025-04-03 11:24:47 -03:00
cpf=cpf,
titulo_eleitoral=titulo_eleitoral,
data_nascimento=data_nascimento,
data_entrada_oci=data_entrada_oci,
data_efetivacao_oci=data_efetivacao_oci,
telefone1=telefone1,
telefone2=telefone2,
profissao=profissao,
regime_trabalho=regime_trabalho,
empresa=empresa,
contratante=contratante,
instituicao_ensino=instituicao_ensino,
tipo_instituicao=tipo_instituicao,
sindicato=sindicato,
cargo_sindical=cargo_sindical,
dirigente_sindical=dirigente_sindical,
central_sindical=central_sindical,
setor_id=setor_id,
celula_id=celula_id,
responsabilidades=responsabilidades
2025-04-03 11:24:47 -03:00
)
db_session.add(militante)
db_session.commit()
2025-04-03 11:24:47 -03:00
flash('Militante criado com sucesso!', 'success')
return redirect(url_for('listar_militantes'))
2025-04-03 11:24:47 -03:00
except Exception as e:
db_session.rollback()
flash(f'Erro ao criar militante: {str(e)}', 'danger')
return redirect(url_for('criar_militante'))
setores = Setor.query.all()
celulas = Celula.query.all()
return render_template('criar_militante.html', setores=setores, celulas=celulas)
2025-01-08 00:19:49 -03:00
# Rota para listar militantes
2025-01-08 00:19:49 -03:00
@app.route("/militantes")
@require_login
@require_permission(Permission.VIEW_CELL_DATA)
def listar_militantes():
try:
print("Buscando militantes...")
db = get_db_connection()
militantes = db.query(Militante).order_by(Militante.nome).all()
print(f"Total de militantes encontrados: {len(militantes)}")
for militante in militantes:
print(f"Militante: {militante.nome} (ID: {militante.id})")
return render_template("listar_militantes.html", militantes=militantes)
except Exception as e:
print(f"Erro ao listar militantes: {e}")
flash('Erro ao listar militantes', 'danger')
return render_template("listar_militantes.html", militantes=[])
finally:
db.close()
# Rota para excluir militante
@app.route("/militantes/excluir/<int:id>", methods=["POST"])
@login_required
@session_timeout
def excluir_militante(id):
try:
db = get_db_connection()
militante = db.query(Militante).get(id)
if not militante:
flash('Militante não encontrado', 'danger')
return redirect(url_for('listar_militantes'))
db.delete(militante)
db.commit()
flash('Militante excluído com sucesso!', 'success')
except Exception as e:
db.rollback()
print(f"Erro ao excluir militante: {e}")
flash('Erro ao excluir militante', 'danger')
finally:
db.close()
return redirect(url_for('listar_militantes'))
2025-01-08 00:19:49 -03:00
# Rota para criar uma nova cota
2025-01-08 00:19:49 -03:00
@app.route("/cotas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def nova_cota():
2025-01-08 00:19:49 -03:00
if request.method == "POST":
militante_id = request.form.get("militante_id")
valor_antigo = float(request.form.get("valor_antigo"))
valor_novo = float(request.form.get("valor_novo"))
data_alteracao = datetime.strptime(request.form.get("data_alteracao"), "%Y-%m-%d").date()
data_vencimento = datetime.strptime(request.form.get("data_vencimento"), "%Y-%m-%d").date()
db = get_db_connection()
try:
cotas_mensais = CotaMensal(
militante_id=militante_id,
valor_antigo=valor_antigo,
valor_novo=valor_novo,
data_alteracao=data_alteracao,
data_vencimento=data_vencimento,
pago=False
)
db.add(cotas_mensais)
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Cota cadastrada com sucesso!'
})
flash('Cota cadastrada com sucesso!', 'success')
return redirect(url_for('listar_cotas'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar cota: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao cadastrar cota. Verifique os dados e tente novamente.'
}), 400
flash('Erro ao cadastrar cota', 'danger')
return render_template("nova_cota.html")
finally:
db.close()
2025-01-08 00:19:49 -03:00
db = get_db_connection()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
return render_template("nova_cota.html", militantes=militantes)
finally:
db.close()
# Rota para listar cotas
2025-01-08 00:19:49 -03:00
@app.route("/cotas")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_cotas():
try:
print("Buscando cotas...")
# Buscar cotas com informações do militante
db = get_db_connection()
cotas = db.query(CotaMensal)\
.join(Militante)\
.order_by(CotaMensal.data_vencimento.desc())\
.all()
print(f"Total de cotas encontradas: {len(cotas)}")
# Calcular status de cada cota
for cota in cotas:
if cota.pago:
cota.status = "paga"
elif cota.data_vencimento < datetime.now().date():
cota.status = "atrasada"
else:
cota.status = "pendente"
print(f"Cota {cota.id}: Militante={cota.militante.nome}, Valor={cota.valor_novo}, Status={cota.status}")
# Buscar militantes para o modal de nova cota
militantes = db.query(Militante).order_by(Militante.nome).all()
return render_template("listar_cotas.html", cotas=cotas, militantes=militantes)
except Exception as e:
print(f"Erro ao listar cotas: {e}")
flash('Erro ao listar cotas', 'danger')
return render_template("listar_cotas.html", cotas=[], militantes=[])
finally:
db.close()
# Rota para editar cota
@app.route('/cotas/editar/<int:id>', methods=['GET', 'POST'])
@login_required
@session_timeout
def editar_cota(id):
db = get_db_connection()
try:
cota = db.query(CotaMensal).get_or_404(id)
if request.method == 'POST':
try:
cota.militante_id = int(request.form['militante_id'])
cota.valor_antigo = float(request.form['valor_antigo'])
cota.valor_novo = float(request.form['valor_novo'])
cota.data_alteracao = datetime.strptime(request.form['data_alteracao'], '%Y-%m-%d').date()
cota.data_vencimento = datetime.strptime(request.form['data_vencimento'], '%Y-%m-%d').date()
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Cota atualizada com sucesso!'
})
flash('Cota atualizada com sucesso!', 'success')
return redirect(url_for('listar_cotas'))
except Exception as e:
db.rollback()
print(f"Erro ao atualizar cota: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao atualizar cota. Verifique os dados e tente novamente.'
}), 400
flash('Erro ao atualizar cota. Verifique os dados e tente novamente.', 'danger')
return redirect(url_for('editar_cota', id=id))
return render_template('editar_cota.html', cota=cota)
except Exception as e:
print(f"Erro ao carregar cota: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao carregar cota.'
}), 404
flash('Erro ao carregar cota', 'danger')
return redirect(url_for('listar_cotas'))
finally:
db.close()
2025-01-08 00:19:49 -03:00
# Rota para criar um novo pagamento
2025-01-08 00:19:49 -03:00
@app.route("/pagamentos/novo", methods=["GET", "POST"])
@require_login
def novo_pagamento():
2025-01-08 00:19:49 -03:00
if request.method == "POST":
militante_id = request.form.get("militante_id")
tipo_pagamento_id = request.form.get("tipo_pagamento_id")
valor = float(request.form.get("valor"))
data_pagamento = datetime.strptime(request.form.get("data_pagamento"), "%Y-%m-%d").date()
# Criar novo pagamento
db = get_db_connection()
try:
pagamento = Pagamento(
militante_id=militante_id,
tipo_pagamento_id=tipo_pagamento_id,
valor=valor,
data_pagamento=data_pagamento
)
db.add(pagamento)
db.commit()
flash('Pagamento cadastrado com sucesso!', 'success')
return redirect(url_for('listar_pagamentos'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar pagamento: {e}")
flash('Erro ao cadastrar pagamento', 'danger')
return render_template("novo_pagamento.html")
finally:
db.close()
# GET - Renderizar formulário
db = get_db_connection()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
tipos_pagamento = db.query(TipoPagamento).order_by(TipoPagamento.descricao).all()
return render_template("novo_pagamento.html", militantes=militantes, tipos_pagamento=tipos_pagamento)
finally:
db.close()
# Rota para listar pagamentos
2025-01-08 00:19:49 -03:00
@app.route("/pagamentos")
@require_login
def listar_pagamentos():
try:
db = get_db_connection()
pagamentos = db.query(Pagamento).order_by(Pagamento.data_pagamento.desc()).all()
return render_template("listar_pagamentos.html", pagamentos=pagamentos)
except Exception as e:
print(f"Erro ao listar pagamentos: {e}")
flash('Erro ao listar pagamentos', 'danger')
return render_template("listar_pagamentos.html", pagamentos=[])
finally:
db.close()
2025-01-08 00:19:49 -03:00
# Rota para criar um novo material vendido
2025-01-08 00:19:49 -03:00
@app.route("/materiais/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_material():
db = get_db_connection()
try:
militante_id = request.form.get('militante_id')
tipo_material_id = request.form.get('tipo_material_id')
descricao = request.form.get('descricao')
valor = float(request.form.get('valor'))
data_venda = datetime.strptime(request.form.get('data_venda'), '%Y-%m-%d')
material = MaterialVendido(
militante_id=militante_id,
tipo_material_id=tipo_material_id,
descricao=descricao,
valor=valor,
data_venda=data_venda
)
db.add(material)
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Material cadastrado com sucesso!'
})
flash('Material cadastrado com sucesso!', 'success')
return redirect(url_for('listar_materiais'))
except Exception as e:
db.rollback()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao cadastrar material. Por favor, tente novamente.'
}), 400
flash('Erro ao cadastrar material. Por favor, tente novamente.', 'error')
return redirect(url_for('listar_materiais'))
finally:
db.close()
2025-01-08 00:19:49 -03:00
# Rota para listar materiais vendidos
2025-01-08 00:19:49 -03:00
@app.route("/materiais")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_materiais():
db = get_db_connection()
try:
materiais = db.query(MaterialVendido).join(Militante).join(TipoMaterial).all()
militantes = db.query(Militante).all()
tipos_material = db.query(TipoMaterial).all()
return render_template('listar_materiais.html',
materiais=materiais,
militantes=militantes,
tipos_material=tipos_material)
finally:
db.close()
2025-01-08 00:19:49 -03:00
# Rota para criar uma nova venda de jornal
2025-01-08 00:19:49 -03:00
@app.route("/jornais/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def nova_venda_jornal():
db = get_db_connection()
try:
militante_id = request.form.get('militante_id')
quantidade = int(request.form.get('quantidade'))
valor_total = float(request.form.get('valor_total'))
data_venda = datetime.strptime(request.form.get('data_venda'), '%Y-%m-%d')
venda = VendaJornalAvulso(
militante_id=militante_id,
quantidade=quantidade,
valor_total=valor_total,
data_venda=data_venda
)
db.add(venda)
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Venda cadastrada com sucesso!'
})
flash('Venda cadastrada com sucesso!', 'success')
return redirect(url_for('listar_vendas_jornal'))
except Exception as e:
db.rollback()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'error',
'message': 'Erro ao cadastrar venda. Por favor, tente novamente.'
}), 400
flash('Erro ao cadastrar venda. Por favor, tente novamente.', 'error')
return redirect(url_for('listar_vendas_jornal'))
finally:
db.close()
# Rota para listar vendas de jornal
2025-01-08 00:19:49 -03:00
@app.route("/jornais")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_vendas_jornal():
db = get_db_connection()
try:
vendas = db.query(VendaJornalAvulso).join(Militante).all()
militantes = db.query(Militante).all()
return render_template('listar_vendas_jornal.html',
vendas=vendas,
militantes=militantes)
finally:
db.close()
2025-01-08 00:19:49 -03:00
# Rota para criar um novo relatório de cotas
2025-01-08 00:19:49 -03:00
@app.route("/relatorios/cotas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_relatorio_cotas():
2025-01-08 00:19:49 -03:00
if request.method == "POST":
setor_id = request.form.get("setor_id")
comite_id = request.form.get("comite_id")
total_cotas = float(request.form.get("total_cotas"))
data_relatorio = datetime.strptime(request.form.get("data_relatorio"), "%Y-%m-%d").date()
db = get_db_connection()
try:
relatorio_cotas_mensais = RelatorioCotasMensais(
setor_id=setor_id,
comite_id=comite_id,
total_cotas=total_cotas,
data_relatorio=data_relatorio
)
db.add(relatorio_cotas_mensais)
db.commit()
flash('Relatório de cotas cadastrado com sucesso!', 'success')
return redirect(url_for('listar_relatorios_cotas'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar relatório de cotas: {e}")
flash('Erro ao cadastrar relatório de cotas', 'danger')
return render_template("novo_relatorio_cotas.html")
finally:
db.close()
db = get_db_connection()
try:
setores = db.query(Setor).order_by(Setor.nome).all()
comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all()
return render_template("novo_relatorio_cotas.html", setores=setores, comites=comites)
finally:
db.close()
# Rota para listar relatórios de cotas
2025-01-08 00:19:49 -03:00
@app.route("/relatorios/cotas")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_relatorios_cotas():
try:
db = get_db_connection()
relatorios = db.query(RelatorioCotasMensais).order_by(RelatorioCotasMensais.data_relatorio.desc()).all()
return render_template("listar_relatorios_cotas.html", relatorios=relatorios)
except Exception as e:
print(f"Erro ao listar relatórios de cotas: {e}")
flash('Erro ao listar relatórios de cotas', 'danger')
return render_template("listar_relatorios_cotas.html", relatorios=[])
finally:
db.close()
2025-01-08 00:19:49 -03:00
# Rota para criar um novo relatório de vendas
2025-01-08 00:19:49 -03:00
@app.route("/relatorios/vendas/novo", methods=["GET", "POST"])
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def novo_relatorio_vendas():
2025-01-08 00:19:49 -03:00
if request.method == "POST":
setor_id = request.form.get("setor_id")
comite_id = request.form.get("comite_id")
total_vendas = float(request.form.get("total_vendas"))
data_relatorio = datetime.strptime(request.form.get("data_relatorio"), "%Y-%m-%d").date()
db = get_db_connection()
try:
relatorio_vendas_materiais = RelatorioVendasMateriais(
setor_id=setor_id,
comite_id=comite_id,
total_vendas=total_vendas,
data_relatorio=data_relatorio
)
db.add(relatorio_vendas_materiais)
db.commit()
flash('Relatório de vendas cadastrado com sucesso!', 'success')
return redirect(url_for('listar_relatorios_vendas'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar relatório de vendas: {e}")
flash('Erro ao cadastrar relatório de vendas', 'danger')
return render_template("novo_relatorio_vendas.html")
finally:
db.close()
db = get_db_connection()
try:
setores = db.query(Setor).order_by(Setor.nome).all()
comites = db.query(ComiteCentral).order_by(ComiteCentral.nome).all()
return render_template("novo_relatorio_vendas.html", setores=setores, comites=comites)
finally:
db.close()
# Rota para listar relatórios de vendas
2025-01-08 00:19:49 -03:00
@app.route("/relatorios/vendas")
@require_login
@require_permission(Permission.VIEW_CELL_REPORTS)
def listar_relatorios_vendas():
try:
db = get_db_connection()
relatorios = db.query(RelatorioVendasMateriais).order_by(RelatorioVendasMateriais.data_relatorio.desc()).all()
return render_template("listar_relatorios_vendas.html", relatorios=relatorios)
except Exception as e:
print(f"Erro ao listar relatórios de vendas: {e}")
flash('Erro ao listar relatórios de vendas', 'danger')
return render_template("listar_relatorios_vendas.html", relatorios=[])
finally:
db.close()
# Rota para editar militante
@app.route("/militantes/editar/<int:id>", methods=["GET", "POST"])
@login_required
@session_timeout
def editar_militante(id):
db = get_db_connection()
try:
militante = db.query(Militante).get(id)
if not militante:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'status': 'error', 'message': 'Militante não encontrado'}), 404
flash('Militante não encontrado', 'danger')
return redirect(url_for('listar_militantes'))
if request.method == "POST":
nome = request.form.get("nome")
cpf = request.form.get("cpf")
email = request.form.get("email")
telefone = request.form.get("telefone")
endereco = request.form.get("endereco")
filiado = request.form.get("filiado") == "on"
# Validar CPF
if not validar_cpf(cpf):
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'status': 'error', 'message': 'CPF inválido'}), 400
flash('CPF inválido', 'danger')
return render_template('editar_militante.html', militante=militante)
# Verificar se já existe outro militante com este CPF
militante_existente = db.query(Militante).filter(
Militante.cpf == cpf,
Militante.id != id
).first()
if militante_existente:
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'status': 'error', 'message': 'CPF já cadastrado para outro militante'}), 400
flash('CPF já cadastrado para outro militante', 'danger')
return render_template('editar_militante.html', militante=militante)
try:
militante.nome = nome
militante.cpf = cpf
militante.email = email
militante.telefone = telefone
militante.endereco = endereco
militante.filiado = filiado
db.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({
'status': 'success',
'message': 'Militante atualizado com sucesso',
'militante': {
'id': militante.id,
'nome': militante.nome,
'cpf': militante.cpf,
'email': militante.email,
'telefone': militante.telefone,
'endereco': militante.endereco,
'filiado': militante.filiado
}
})
flash('Militante atualizado com sucesso!', 'success')
return redirect(url_for('listar_militantes'))
except Exception as e:
db.rollback()
print(f"Erro ao atualizar militante: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'status': 'error', 'message': 'Erro ao atualizar militante'}), 500
flash('Erro ao atualizar militante', 'danger')
return render_template('editar_militante.html', militante=militante)
return render_template('editar_militante.html', militante=militante)
except Exception as e:
print(f"Erro ao editar militante: {e}")
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'status': 'error', 'message': 'Erro ao carregar militante'}), 500
flash('Erro ao carregar militante', 'danger')
return redirect(url_for('listar_militantes'))
finally:
db.close()
# Rota para criar um novo usuário
@app.route("/usuarios/novo", methods=["GET", "POST"])
@login_required
def novo_usuario():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
email = request.form.get("email")
role_id = request.form.get("role_id")
setor_id = request.form.get("setor_id")
# Verificar se usuário já existe
db = get_db_connection()
try:
if db.query(Usuario).filter_by(username=username).first():
flash('Nome de usuário já existe.', 'danger')
return render_template("novo_usuario.html")
novo_usuario = Usuario(
username=username,
email=email,
role_id=role_id,
setor_id=setor_id
)
novo_usuario.set_password(password)
novo_usuario.otp_secret = pyotp.random_base32()
db.add(novo_usuario)
db.commit()
flash('Usuário cadastrado com sucesso!', 'success')
return redirect(url_for('listar_usuarios'))
except Exception as e:
db.rollback()
print(f"Erro ao cadastrar usuário: {e}")
flash('Erro ao cadastrar usuário', 'danger')
return render_template("novo_usuario.html")
finally:
db.close()
db = get_db_connection()
try:
roles = db.query(Role).order_by(Role.nome).all()
setores = db.query(Setor).order_by(Setor.nome).all()
return render_template("novo_usuario.html", roles=roles, setores=setores)
finally:
db.close()
# Rota para verificar status da sessão
@app.route('/check_session')
def check_session():
if 'user_id' not in session:
return jsonify({'status': 'expired'})
if 'last_activity' in session:
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
if now - last_activity > timedelta(hours=2):
# Registrar o logout por timeout
try:
db = get_db_connection()
user = db.query(Usuario).get(session.get('user_id'))
if user:
user.ultimo_logout = datetime.now()
user.motivo_logout = "Timeout de sessão"
db.commit()
db.close()
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
return jsonify({'status': 'expired'})
return jsonify({'status': 'active'})
@app.route("/qr/<token>")
def get_qr_code(token):
db = get_db_connection()
try:
user = db.query(Usuario).filter_by(username='admin').first()
if not user:
flash('Usuário não encontrado', 'error')
return redirect(url_for('login'))
qr_uri = user.get_otp_uri()
return render_template('mostrar_qr_code.html', qr_uri=qr_uri)
finally:
db.close()
2025-04-03 11:24:47 -03:00
# Adicionar nova rota para API de setores
@app.route("/api/setores/<int:cr_id>")
@require_login
2025-04-03 11:24:47 -03:00
def get_setores(cr_id):
setores = db_session.query(Setor).filter_by(cr_id=cr_id).all()
return jsonify({
'setores': [{'id': s.id, 'nome': s.nome} for s in setores]
})
@app.route('/celulas/<int:celula_id>/militantes')
@require_login
def list_militantes_celula(celula_id):
@require_instance_access('celula', celula_id)
def decorated_function(celula_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(celula_id=celula_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(celula_id)
@app.route('/setores/<int:setor_id>/militantes')
@require_login
def list_militantes_setor(setor_id):
@require_instance_access('setor', setor_id)
def decorated_function(setor_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(setor_id=setor_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(setor_id)
@app.route('/crs/<int:cr_id>/militantes')
@require_login
def list_militantes_cr(cr_id):
@require_instance_access('cr', cr_id)
def decorated_function(cr_id):
db = get_db_connection()
try:
militantes = db.query(Usuario).filter_by(cr_id=cr_id).all()
return render_template('militantes/list.html', militantes=militantes)
finally:
db.close()
return decorated_function(cr_id)
@app.route('/celulas/<int:celula_id>/pagamentos')
@require_login
def list_pagamentos_celula(celula_id):
@require_instance_access('celula', celula_id)
def decorated_function(celula_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).filter_by(celula_id=celula_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(celula_id)
@app.route('/setores/<int:setor_id>/pagamentos')
@require_login
def list_pagamentos_setor(setor_id):
@require_instance_access('setor', setor_id)
def decorated_function(setor_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(setor_id)
@app.route('/crs/<int:cr_id>/pagamentos')
@require_login
def list_pagamentos_cr(cr_id):
@require_instance_access('cr', cr_id)
def decorated_function(cr_id):
db = get_db_connection()
try:
pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
finally:
db.close()
return decorated_function(cr_id)
@app.route('/celulas/<int:celula_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_CELL_PAYMENT', 'celula_id')
def novo_pagamento_celula(celula_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
celula_id=celula_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_celula', celula_id=celula_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route('/setores/<int:setor_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_SECTOR_PAYMENT', 'setor_id')
def novo_pagamento_setor(setor_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
setor_id=setor_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_setor', setor_id=setor_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route('/crs/<int:cr_id>/pagamentos/novo', methods=['GET', 'POST'])
@require_login
@require_instance_permission('REGISTER_CR_PAYMENT', 'cr_id')
def novo_pagamento_cr(cr_id):
if request.method == 'POST':
db = get_db_connection()
try:
pagamento = Pagamento(
valor=request.form['valor'],
data=request.form['data'],
militante_id=request.form['militante_id'],
cr_id=cr_id
)
db.add(pagamento)
db.commit()
flash('Pagamento registrado com sucesso!', 'success')
return redirect(url_for('list_pagamentos_cr', cr_id=cr_id))
finally:
db.close()
return render_template('pagamentos/form.html')
@app.route("/alterar_senha", methods=["GET", "POST"])
@require_login
def alterar_senha():
"""Rota para alterar a senha do usuário"""
if request.method == "POST":
senha_atual = request.form.get("senha_atual")
nova_senha = request.form.get("nova_senha")
confirmar_senha = request.form.get("confirmar_senha")
if not all([senha_atual, nova_senha, confirmar_senha]):
flash("Todos os campos são obrigatórios.", "error")
return redirect(url_for("alterar_senha"))
if nova_senha != confirmar_senha:
flash("As senhas não coincidem.", "error")
return redirect(url_for("alterar_senha"))
db = get_db_connection()
try:
user = db.query(Usuario).get(current_user.id)
if not user.check_password(senha_atual):
flash("Senha atual incorreta.", "error")
return redirect(url_for("alterar_senha"))
user.password_hash = generate_password_hash(nova_senha)
db.commit()
flash("Senha alterada com sucesso!", "success")
return redirect(url_for("home"))
finally:
db.close()
return render_template("alterar_senha.html")
@app.route('/usuarios/<int:user_id>/toggle_status', methods=['POST'])
@require_login
def toggle_user_status(user_id):
user = db_session.query(Usuario).get_or_404(user_id)
# Verificar permissões baseado na hierarquia
if not current_user.has_permission('system_config'):
if current_user.has_permission('manage_cr_sectors'):
# Secretário de CR só pode gerenciar membros do seu CR
if user.cr_id != current_user.cr_id:
flash('Você não tem permissão para gerenciar este usuário.', 'danger')
return redirect(url_for('dashboard_admin'))
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor só pode gerenciar membros do seu setor
if user.setor_id != current_user.setor_id:
flash('Você não tem permissão para gerenciar este usuário.', 'danger')
return redirect(url_for('dashboard_admin'))
elif current_user.has_permission('manage_cell_members'):
# Secretário de Célula só pode gerenciar membros da sua célula
if user.celula_id != current_user.celula_id:
flash('Você não tem permissão para gerenciar este usuário.', 'danger')
return redirect(url_for('dashboard_admin'))
else:
# Militante básico não pode gerenciar ninguém
flash('Você não tem permissão para gerenciar usuários.', 'danger')
return redirect(url_for('dashboard_admin'))
user.ativo = not user.ativo
db_session.commit()
return jsonify({'success': True, 'message': 'Status do usuário alterado com sucesso!'})
@app.route('/usuarios/<int:user_id>/alterar_nivel', methods=['POST'])
@require_login
def alterar_nivel(user_id):
user = db_session.query(Usuario).get_or_404(user_id)
novo_nivel = request.json.get('nivel')
# Verificar permissões baseado na hierarquia
if not current_user.has_permission('system_config'):
if current_user.has_permission('manage_cr_sectors'):
# Secretário de CR só pode alterar níveis dentro do seu CR
if user.cr_id != current_user.cr_id:
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'})
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor só pode alterar níveis dentro do seu setor
if user.setor_id != current_user.setor_id:
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar o nível deste usuário.'})
else:
# Outros níveis não podem alterar níveis
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar níveis de usuários.'})
# Verificar se o novo nível é válido para o nível hierárquico do usuário atual
if current_user.has_permission('system_config'):
# Secretário Geral e Secretário de Organização podem alterar para qualquer nível
pass
elif current_user.has_permission('manage_cr_sectors'):
# Secretário de CR só pode alterar para níveis do CR
if novo_nivel not in ['membro_cr', 'secretario_cr']:
return jsonify({'success': False, 'message': 'Nível inválido para este CR.'})
elif current_user.has_permission('manage_sector_cells'):
# Secretário de Setor só pode alterar para níveis do setor
if novo_nivel not in ['membro_setor', 'secretario_setor']:
return jsonify({'success': False, 'message': 'Nível inválido para este setor.'})
# Atualizar o nível do usuário
user.role = novo_nivel
db_session.commit()
return jsonify({'success': True, 'message': 'Nível do usuário alterado com sucesso!'})
@app.route('/usuarios/<int:user_id>/toggle_quadro_orientador', methods=['POST'])
@require_login
def toggle_quadro_orientador(user_id):
db = get_db_connection()
try:
user = db.query(Usuario).get(user_id)
if not user:
return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404
# Verificar permissões
if not (current_user.has_permission('system_config') or
(current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or
(current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)):
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar esta responsabilidade'}), 403
# Verificar se o usuário tem um militante associado
if not user.militante:
return jsonify({'success': False, 'message': 'Usuário não tem um militante associado'}), 400
# Alternar o status de Quadro-Orientador
user.militante.quadro_orientador = not user.militante.quadro_orientador
# Atualizar a responsabilidade no campo responsabilidades
if user.militante.quadro_orientador:
user.militante.responsabilidades |= Militante.QUADRO_ORIENTADOR
else:
user.militante.responsabilidades &= ~Militante.QUADRO_ORIENTADOR
db.commit()
return jsonify({
'success': True,
'message': f'Responsabilidade de Quadro-Orientador {"adicionada" if user.militante.quadro_orientador else "removida"} com sucesso'
})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db.close()
<<<<<<< HEAD
@app.route('/usuarios/<int:user_id>/toggle_aspirante', methods=['POST'])
@require_login
def toggle_aspirante(user_id):
db = get_db_connection()
try:
user = db.query(Usuario).get(user_id)
if not user:
return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404
# Verificar permissões
if not (current_user.has_permission('system_config') or
(current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or
(current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)):
return jsonify({'success': False, 'message': 'Você não tem permissão para alterar este status'}), 403
# Verificar se o usuário tem um militante associado
if not user.militante:
return jsonify({'success': False, 'message': 'Usuário não tem um militante associado'}), 400
# Se estiver tentando remover o status de aspirante
if user.militante.aspirante:
# Verificar se já passaram 3 meses
if datetime.utcnow() - user.militante.data_inicio_aspirante < timedelta(days=90):
return jsonify({
'success': False,
'message': 'Não é possível remover o status de aspirante antes de 3 meses de integração'
}), 400
# Verificar se há avaliação
if not user.militante.avaliacao_aspirante:
return jsonify({
'success': False,
'message': 'É necessário registrar uma avaliação antes de remover o status de aspirante'
}), 400
# Alternar o status de Aspirante
user.militante.aspirante = not user.militante.aspirante
# Atualizar a responsabilidade no campo responsabilidades
if user.militante.aspirante:
user.militante.responsabilidades |= Militante.ASPIRANTE
user.militante.data_inicio_aspirante = datetime.utcnow()
user.militante.avaliacao_aspirante = None
user.militante.data_avaliacao_aspirante = None
else:
user.militante.responsabilidades &= ~Militante.ASPIRANTE
user.militante.data_avaliacao_aspirante = datetime.utcnow()
db.commit()
return jsonify({
'success': True,
'message': f'Status de Aspirante {"adicionado" if user.militante.aspirante else "removido"} com sucesso'
})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db.close()
@app.route('/usuarios/<int:user_id>/avaliar_aspirante', methods=['POST'])
@require_login
def avaliar_aspirante(user_id):
db = get_db_connection()
try:
user = db.query(Usuario).get(user_id)
if not user:
return jsonify({'success': False, 'message': 'Usuário não encontrado'}), 404
# Verificar permissões
if not (current_user.has_permission('system_config') or
(current_user.has_permission('manage_cr_sectors') and user.cr_id == current_user.cr_id) or
(current_user.has_permission('manage_sector_cells') and user.setor_id == current_user.setor_id)):
return jsonify({'success': False, 'message': 'Você não tem permissão para avaliar este aspirante'}), 403
# Verificar se o usuário tem um militante associado e é aspirante
if not user.militante or not user.militante.aspirante:
return jsonify({'success': False, 'message': 'Usuário não é um aspirante'}), 400
# Verificar se já passaram 3 meses
if datetime.utcnow() - user.militante.data_inicio_aspirante < timedelta(days=90):
return jsonify({
'success': False,
'message': 'Não é possível avaliar o aspirante antes de 3 meses de integração'
}), 400
# Obter a avaliação do corpo da requisição
avaliacao = request.json.get('avaliacao')
if not avaliacao:
return jsonify({'success': False, 'message': 'A avaliação é obrigatória'}), 400
# Atualizar a avaliação
user.militante.avaliacao_aspirante = avaliacao
user.militante.data_avaliacao_aspirante = datetime.utcnow()
db.commit()
return jsonify({
'success': True,
'message': 'Avaliação registrada com sucesso'
})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)}), 500
finally:
db.close()
@app.template_filter('has_permission')
def has_permission(value, permission):
"""Verifica se o valor contém a permissão especificada."""
return bool(value & permission)
@app.route('/militante/desligar/<int:id>', methods=['POST'])
@login_required
def desligar_militante(id):
"""Desliga um militante e desativa seu usuário associado"""
militante = db_session.query(Militante).get(id)
if militante is None:
flash('Militante não encontrado.', 'danger')
return redirect(url_for('listar_militantes'))
motivo = request.form.get('motivo')
if not motivo:
flash('É necessário informar o motivo do desligamento.', 'danger')
return redirect(url_for('listar_militantes'))
# Atualizar estado do militante
militante.estado = EstadoMilitante.DESLIGADO.value
militante.data_desligamento = datetime.utcnow()
militante.motivo_desligamento = motivo
# Desativar usuário associado se existir
if militante.usuario:
militante.usuario.ativo = False
militante.usuario.ultimo_logout = datetime.utcnow()
militante.usuario.motivo_logout = "Desligamento do militante"
db_session.commit()
flash('Militante desligado com sucesso.', 'success')
return redirect(url_for('listar_militantes'))
@app.route('/editar_pagamento/<int:id>', methods=['GET', 'POST'])
@require_login
def editar_pagamento(id):
user = current_user
# Verificar permissões do usuário
if not (user.has_permission(Permission.EDIT_CELL_PAYMENT) or
user.has_permission(Permission.EDIT_SECTOR_PAYMENT) or
user.has_permission(Permission.EDIT_CR_PAYMENT) or
user.has_permission(Permission.EDIT_CC_PAYMENT)):
flash('Você não tem permissão para editar pagamentos.', 'error')
return redirect(url_for('home'))
pagamento = db_session.query(Pagamento).get(id)
if not pagamento:
flash('Pagamento não encontrado.', 'error')
return redirect(url_for('listar_pagamentos'))
if request.method == 'POST':
pagamento.valor = request.form['valor']
pagamento.data_pagamento = datetime.strptime(request.form['data_pagamento'], '%Y-%m-%d')
db_session.commit()
flash('Pagamento atualizado com sucesso!', 'success')
return redirect(url_for('listar_pagamentos'))
return render_template('editar_pagamento.html', pagamento=pagamento)
@app.before_request
def session_timeout():
"""Verifica se a sessão expirou"""
if current_user.is_authenticated:
if 'last_activity' not in session:
session['last_activity'] = time()
return
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
# Se passaram mais de 30 minutos (configurável)
timeout_minutes = 30
if now - last_activity > timedelta(minutes=timeout_minutes):
# Registrar o logout por timeout
try:
current_user.ultimo_logout = datetime.now()
current_user.motivo_logout = "Timeout de sessão"
db_session.commit()
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning')
return redirect(url_for('login'))
# Atualizar timestamp de último acesso
session['last_activity'] = time()
# Atualizar também no banco de dados
try:
current_user.update_last_activity()
db_session.commit()
except Exception as e:
print(f"Erro ao atualizar última atividade: {e}")
=======
# API Routes para os modais
@app.route('/api/pagamentos/<int:id>', methods=['GET'])
@login_required
def get_pagamento(id):
db = get_db_connection()
try:
pagamento = db.query(Pagamento).get(id)
if not pagamento:
return jsonify({'success': False, 'message': 'Pagamento não encontrado'}), 404
return jsonify({
'id': pagamento.id,
'valor': pagamento.valor,
'data_pagamento': pagamento.data_pagamento.strftime('%Y-%m-%d'),
'tipo_pagamento_id': pagamento.tipo_pagamento_id,
'observacao': pagamento.observacao
})
finally:
db.close()
@app.route('/api/pagamentos/<int:id>', methods=['PUT'])
@login_required
def update_pagamento(id):
db = get_db_connection()
try:
pagamento = db.query(Pagamento).get(id)
if not pagamento:
return jsonify({'success': False, 'message': 'Pagamento não encontrado'}), 404
try:
pagamento.valor = float(request.form['valor'])
pagamento.data_pagamento = datetime.strptime(request.form['data_pagamento'], '%Y-%m-%d')
pagamento.tipo_pagamento_id = int(request.form['tipo_pagamento'])
pagamento.observacao = request.form['observacao']
db.commit()
return jsonify({'success': True})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)}), 400
finally:
db.close()
@app.route('/api/pagamentos/<int:id>', methods=['DELETE'])
@login_required
def delete_pagamento(id):
db = get_db_connection()
try:
pagamento = db.query(Pagamento).get(id)
if not pagamento:
return jsonify({'success': False, 'message': 'Pagamento não encontrado'}), 404
try:
db.delete(pagamento)
db.commit()
return jsonify({'success': True})
except Exception as e:
db.rollback()
return jsonify({'success': False, 'message': str(e)}), 400
finally:
db.close()
>>>>>>> 324660d (refactor: melhorias na interface e funcionalidades - Atualização do layout do dashboard com Bootstrap 5 - Remoção do template editar_pagamento.html (integrado ao modal) - Melhorias no template home.html com cards estatísticos - Ajustes nos estilos e responsividade - Correções nas rotas e conexões do banco de dados - Implementação do modal de edição de pagamentos - Adição de efeitos hover e melhorias visuais)
def create_app():
app = Flask(__name__)
# ... existing code ...
# ... existing code ...
return app
def init_system():
"""Inicializa o sistema com todos os usuários necessários"""
print("Inicializando sistema...")
# Inicializar banco de dados
print("Inicializando banco de dados...")
init_database()
# Criar admin
create_admin()
# Criar usuários de teste
create_test_users()
# Verificar configuração
db = get_db_connection()
try:
# Verificar admin
admin = db.query(Usuario).filter_by(username='admin').first()
if admin:
print("\nAdmin configurado:")
print(f"Username: admin")
print(f"Senha: admin123")
if os.path.exists('admin_qr.png'):
print("OTP: Usando configuração existente do arquivo admin_qr.png")
else:
print("OTP: Nova configuração gerada")
# Verificar usuários de teste
test_users = ['aligner', 'tester', 'deployer']
for username in test_users:
user = db.query(Usuario).filter_by(username=username).first()
if user:
print(f"\nUsuário {username} configurado:")
print(f"Username: {username}")
print(f"Senha: Test123!@#")
if user.otp_secret:
print("OTP: Configurado")
else:
print("OTP: Não configurado")
finally:
db.close()
print("\nInstruções:")
print("1. Configure o OTP usando o QR code gerado")
print("2. Faça login com as credenciais fornecidas")
print("3. Altere a senha no primeiro login")
print("\nCredenciais:")
print("Admin:")
print("Username: admin")
print("Senha: admin123")
print("\nUsuários de teste:")
print("Username: aligner, tester, deployer")
print("Senha: Test123!@#")
if __name__ == '__main__':
init_system()
2025-01-08 00:19:49 -03:00
app.run(debug=True)