refactor: Implementa arquitetura MVC limpa

- Separa modelos em entidades individuais
- Cria camada de serviços para acesso a dados
- Implementa controladores para lógica de negócio
- Organiza rotas em blueprints por funcionalidade
- Adiciona documentação de arquitetura no README
- Cria script para preparação da estrutura

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
LS
2025-04-22 16:35:08 -03:00
parent e6057cd566
commit 62aaec3fbe
22 changed files with 2083 additions and 101 deletions

View File

@@ -0,0 +1,124 @@
from flask import session, flash, redirect, url_for, request
from flask_login import login_user, logout_user, current_user
from datetime import datetime
import pyotp
import qrcode
from io import BytesIO
import base64
from models.entities.usuario import Usuario
from services.database_service import DatabaseService
class AuthController:
"""Controlador para funções de autenticação"""
@staticmethod
def login():
"""Processa o login de usuário"""
if request.method != "POST":
return False
email_or_username = request.form.get("email")
password = request.form.get("password")
otp = request.form.get("otp")
if not all([email_or_username, password]):
flash("Email/usuário e senha são obrigatórios.", "danger")
return False
db = DatabaseService.get_db_connection()
try:
# Tenta encontrar o usuário por email ou username
user = db.query(Usuario).filter(
(Usuario.email == email_or_username) |
(Usuario.username == email_or_username)
).first()
if not user or not user.check_password(password):
flash("Email/usuário ou senha incorretos.", "danger")
return False
# Verificar OTP se o usuário tiver configurado
if user.otp_secret and not otp:
flash("Código OTP é obrigatório para sua conta.", "danger")
return False
if user.otp_secret and not user.verify_otp(otp):
flash("Código OTP inválido.", "danger")
return False
# Atualizar último login
user.ultimo_login = datetime.utcnow()
db.commit()
# Fazer login e setar sessão
login_user(user)
session['user_id'] = user.id
session['username'] = user.username
session['is_admin'] = user.is_admin
return True
finally:
db.close()
@staticmethod
def logout():
"""Processa o logout de usuário"""
db = DatabaseService.get_db_connection()
try:
user = current_user
if user.is_authenticated:
user.logout()
db.commit()
logout_user()
flash('Logout realizado com sucesso!', 'success')
return True
finally:
db.close()
@staticmethod
def alterar_senha(user_id, senha_atual, nova_senha, confirmar_senha):
"""Altera a senha do usuário"""
if not all([senha_atual, nova_senha, confirmar_senha]):
flash("Todos os campos são obrigatórios.", "error")
return False
if nova_senha != confirmar_senha:
flash("As senhas não coincidem.", "error")
return False
db = DatabaseService.get_db_connection()
try:
user = db.query(Usuario).get(user_id)
if not user:
flash("Usuário não encontrado.", "error")
return False
if not user.check_password(senha_atual):
flash("Senha atual incorreta.", "error")
return False
user.set_password(nova_senha)
db.commit()
flash("Senha alterada com sucesso!", "success")
return True
finally:
db.close()
@staticmethod
def generate_qr_code(user):
"""Gera um QR code para o usuário"""
if not user.otp_secret:
user.otp_secret = pyotp.random_base32()
totp = pyotp.TOTP(user.otp_secret)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles"))
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8')
return qr_code

View File

@@ -0,0 +1,80 @@
from flask import session, render_template
from datetime import datetime
from sqlalchemy.sql import func
from models.entities.militante import Militante
from models.entities.cota_mensal import CotaMensal
from models.entities.material_vendido import MaterialVendido
from models.entities.assinatura_anual import AssinaturaAnual
from models.entities.pagamento import Pagamento
from models.entities.tipo_pagamento import TipoPagamento
from models.entities.usuario import Usuario
from services.database_service import DatabaseService
class HomeController:
"""Controlador para página inicial e dashboard"""
@staticmethod
def dashboard():
"""Gera dados para o dashboard principal"""
db = DatabaseService.get_db_connection()
try:
# Buscar nome do usuário
usuario = db.query(Usuario).get(session.get('user_id'))
nome_usuario = usuario.username if usuario else "Usuário"
# Formatar data atual em português
data_atual = datetime.now().strftime("%d de %B de %Y")
# Buscar dados para o dashboard
total_militantes = db.query(Militante).count()
total_cotas = db.query(func.sum(CotaMensal.valor_novo)).scalar() or 0
total_materiais = db.query(MaterialVendido).count()
total_assinaturas = db.query(AssinaturaAnual).count()
# Buscar últimos militantes cadastrados
ultimos_militantes = db.query(Militante)\
.order_by(Militante.id.desc())\
.limit(5)\
.all()
# Buscar últimos pagamentos
ultimos_pagamentos = db.query(Pagamento)\
.join(Militante)\
.order_by(Pagamento.data_pagamento.desc())\
.limit(5)\
.all()
# Buscar tipos de pagamento
tipos_pagamento = db.query(TipoPagamento).all()
return {
'nome_usuario': nome_usuario,
'data_atual': data_atual,
'total_militantes': total_militantes,
'total_cotas': "{:.2f}".format(total_cotas),
'total_materiais': total_materiais,
'total_assinaturas': total_assinaturas,
'ultimos_militantes': ultimos_militantes,
'ultimos_pagamentos': ultimos_pagamentos,
'tipos_pagamento': tipos_pagamento
}
except Exception as e:
print(f"Erro ao carregar dashboard: {e}")
import traceback
traceback.print_exc()
return {
'nome_usuario': "Usuário",
'data_atual': datetime.now().strftime("%d/%m/%Y"),
'total_militantes': 0,
'total_cotas': "0.00",
'total_materiais': 0,
'total_assinaturas': 0,
'ultimos_militantes': [],
'ultimos_pagamentos': [],
'tipos_pagamento': []
}
finally:
db.close()

View File

@@ -0,0 +1,270 @@
from flask import request, jsonify, flash
from datetime import datetime
from werkzeug.exceptions import NotFound
from services.militante_service import MilitanteService
from models.entities.militante import Militante, EstadoMilitante
from models.entities.endereco import Endereco
from models.entities.email_militante import EmailMilitante
from utils.date_utils import validar_data, converter_data, validar_sequencia_datas, calcular_idade
class MilitanteController:
"""Controlador para operações com militantes"""
@staticmethod
def listar_militantes():
"""Lista todos os militantes"""
return MilitanteService.listar_militantes()
@staticmethod
def buscar_militante(militante_id):
"""Busca um militante pelo ID"""
militante = MilitanteService.buscar_militante(militante_id)
if not militante:
raise NotFound(f"Militante com ID {militante_id} não encontrado")
return militante
@staticmethod
def criar_militante(form_data):
"""Cria um novo militante"""
# Validar CPF
from functions.validations import validar_cpf
cpf = form_data.get('cpf')
if not validar_cpf(cpf):
return jsonify({
'status': 'error',
'message': 'CPF inválido'
}), 400
# Verificar se já existe militante com este CPF
if MilitanteService.buscar_por_cpf(cpf):
return jsonify({
'status': 'error',
'message': 'CPF já cadastrado'
}), 400
try:
# Criar endereço
endereco = Endereco(
cep=form_data.get('cep'),
estado=form_data.get('estado'),
cidade=form_data.get('cidade'),
bairro=form_data.get('bairro'),
rua=form_data.get('logradouro'),
numero=form_data.get('numero'),
complemento=form_data.get('complemento')
)
# Salvar endereço para obter ID
endereco_id = MilitanteService.salvar_endereco(endereco)
# Processar datas
data_nascimento = datetime.strptime(form_data.get('data_nascimento'), '%Y-%m-%d') if form_data.get('data_nascimento') else None
data_entrada_oci = datetime.strptime(form_data.get('data_entrada_oci'), '%Y-%m-%d') if form_data.get('data_entrada_oci') else None
data_efetivacao_oci = datetime.strptime(form_data.get('data_efetivacao_oci'), '%Y-%m-%d') if form_data.get('data_efetivacao_oci') else None
# Criar militante
militante = Militante(
# Dados Básicos
nome=form_data.get('nome'),
cpf=cpf,
titulo_eleitoral=form_data.get('titulo_eleitoral'),
data_nascimento=data_nascimento,
data_entrada_oci=data_entrada_oci,
data_efetivacao_oci=data_efetivacao_oci,
# Contato
telefone1=form_data.get('telefone1'),
telefone2=form_data.get('telefone2'),
endereco_id=endereco_id,
# Profissional
profissao=form_data.get('profissao'),
regime_trabalho=form_data.get('regime_trabalho'),
empresa=form_data.get('empresa'),
contratante=form_data.get('contratante'),
# Acadêmico
instituicao_ensino=form_data.get('instituicao_ensino'),
tipo_instituicao=form_data.get('tipo_instituicao'),
# Sindical
sindicato=form_data.get('sindicato'),
cargo_sindical=form_data.get('cargo_sindical'),
central_sindical=form_data.get('central_sindical'),
dirigente_sindical=form_data.get('dirigente_sindical') == 'on',
# Organização
estado=EstadoMilitante(form_data.get('estado', 'ATIVO')),
celula_id=form_data.get('celula_id', type=int),
responsabilidades=form_data.get('responsabilidades', type=int, default=0),
# Por padrão, todo novo militante é aspirante
aspirante=True,
data_inicio_aspirante=datetime.now()
)
# Salvar militante para obter ID
militante_id = MilitanteService.salvar_militante(militante)
# Adicionar email principal se fornecido
email = form_data.get('email')
if email:
email_militante = EmailMilitante(
endereco_email=email,
militante_id=militante_id
)
MilitanteService.salvar_email_militante(email_militante)
return jsonify({
'status': 'success',
'message': 'Militante criado com sucesso!',
'id': militante_id
})
except Exception as e:
return jsonify({
'status': 'error',
'message': f'Erro ao criar militante: {str(e)}'
}), 500
@staticmethod
def atualizar_militante(militante_id, form_data):
"""Atualiza um militante existente"""
try:
militante = MilitanteService.buscar_militante(militante_id)
if not militante:
return jsonify({
'status': 'error',
'message': 'Militante não encontrado'
}), 404
# Obter dados do formulário
nome = form_data.get('nome')
cpf = form_data.get('cpf')
titulo_eleitoral = form_data.get('titulo_eleitoral')
data_nascimento = form_data.get('data_nascimento')
data_entrada_oci = form_data.get('data_entrada_oci')
data_efetivacao_oci = form_data.get('data_efetivacao_oci')
telefone1 = form_data.get('telefone1')
telefone2 = form_data.get('telefone2')
email = form_data.get('email')
# Validar e converter datas
try:
data_nascimento = converter_data(data_nascimento) if data_nascimento else None
data_entrada_oci = converter_data(data_entrada_oci) if data_entrada_oci else None
data_efetivacao_oci = converter_data(data_efetivacao_oci) if data_efetivacao_oci else None
# Validar sequência lógica das datas
validar_sequencia_datas(
data_nascimento=data_nascimento,
data_entrada=data_entrada_oci,
data_efetivacao=data_efetivacao_oci
)
except ValueError as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 400
# Atualizar dados básicos
if nome: militante.nome = nome
if cpf: militante.cpf = cpf
if titulo_eleitoral: militante.titulo_eleitoral = titulo_eleitoral
militante.data_nascimento = data_nascimento
militante.data_entrada_oci = data_entrada_oci
militante.data_efetivacao_oci = data_efetivacao_oci
militante.telefone1 = telefone1
militante.telefone2 = telefone2
# Calcular idade
if data_nascimento:
militante.idade = calcular_idade(data_nascimento)
# Atualizar ou criar email
if email:
MilitanteService.atualizar_email_militante(militante_id, email)
# Salvar alterações
MilitanteService.salvar_militante(militante)
return jsonify({
'status': 'success',
'message': 'Militante atualizado com sucesso',
'data': {
'nome': militante.nome,
'cpf': militante.cpf,
'idade': militante.idade if hasattr(militante, 'idade') else None,
'emails': [e.endereco_email for e in militante.emails],
'telefone1': militante.telefone1,
'celula_id': str(militante.celula_id) if militante.celula_id else None,
'responsabilidades_valor': militante.responsabilidades
}
})
except Exception as e:
return jsonify({
'status': 'error',
'message': f'Erro ao atualizar militante: {str(e)}'
}), 500
@staticmethod
def excluir_militante(militante_id):
"""Exclui um militante"""
try:
if MilitanteService.excluir_militante(militante_id):
flash('Militante excluído com sucesso!', 'success')
return True
else:
flash('Militante não encontrado', 'danger')
return False
except Exception as e:
flash(f'Erro ao excluir militante: {str(e)}', 'danger')
return False
@staticmethod
def buscar_dados_militante(militante_id):
"""Busca os dados de um militante específico"""
militante = MilitanteService.buscar_militante(militante_id)
if not militante:
return jsonify({
'status': 'error',
'message': 'Militante não encontrado'
}), 404
# Função auxiliar para formatar data com validação
def formatar_data_segura(data):
try:
if not data:
return None
return data.strftime('%Y-%m-%d')
except Exception as e:
print(f"Erro ao formatar data: {str(e)}, valor: {data}")
return None
# Formatar datas com validação
data_nascimento = formatar_data_segura(militante.data_nascimento)
data_entrada_oci = formatar_data_segura(militante.data_entrada_oci)
data_efetivacao_oci = formatar_data_segura(militante.data_efetivacao_oci)
return jsonify({
'status': 'success',
'id': militante.id,
'nome': militante.nome,
'cpf': militante.cpf,
'titulo_eleitoral': militante.titulo_eleitoral,
'data_nascimento': data_nascimento,
'data_entrada_oci': data_entrada_oci,
'data_efetivacao_oci': data_efetivacao_oci,
'emails': [email.endereco_email for email in militante.emails] if militante.emails else [],
'telefone1': militante.telefone1,
'telefone2': militante.telefone2,
'celula_id': militante.celula_id,
'responsabilidades_valor': militante.responsabilidades,
'sindicato': militante.sindicato,
'cargo_sindical': militante.cargo_sindical,
'central_sindical': militante.central_sindical,
'dirigente_sindical': militante.dirigente_sindical
})

View File

@@ -0,0 +1,202 @@
from flask import request, jsonify, flash, session
from flask_login import current_user
from datetime import datetime
import secrets
import pyotp
from models.entities.usuario import Usuario
from services.usuario_service import UsuarioService
from services.database_service import DatabaseService
class UsuarioController:
"""Controlador para operações com usuários"""
@staticmethod
def listar_usuarios():
"""Lista todos os usuários do sistema"""
return UsuarioService.listar_usuarios()
@staticmethod
def buscar_usuario(user_id):
"""Busca um usuário pelo ID"""
return UsuarioService.buscar_usuario(user_id)
@staticmethod
def criar_usuario(data):
"""Cria um novo usuário"""
# Verificar campos obrigatórios
required_fields = ['username', 'password', 'email']
for field in required_fields:
if field not in data:
flash(f'Campo {field} é obrigatório.', 'danger')
return False
# Verificar se usuário já existe
if UsuarioService.buscar_por_username(data['username']):
flash('Nome de usuário já existe.', 'danger')
return False
try:
# Criar usuário
usuario = Usuario(
username=data['username'],
email=data['email'],
nome=data.get('nome'),
is_admin=data.get('is_admin', False)
)
# Definir senha
usuario.set_password(data['password'])
# Gerar OTP secret
usuario.otp_secret = pyotp.random_base32()
# Definir outros campos
if 'role_id' in data:
usuario.role_id = data['role_id']
if 'setor_id' in data:
usuario.setor_id = data['setor_id']
# Salvar no banco
result = UsuarioService.salvar_usuario(usuario)
if result:
flash('Usuário cadastrado com sucesso!', 'success')
return True
else:
flash('Erro ao cadastrar usuário.', 'danger')
return False
except Exception as e:
flash(f'Erro ao cadastrar usuário: {str(e)}', 'danger')
return False
@staticmethod
def atualizar_usuario(user_id, data):
"""Atualiza um usuário existente"""
usuario = UsuarioService.buscar_usuario(user_id)
if not usuario:
flash('Usuário não encontrado.', 'danger')
return False
# Atualizar campos
if 'email' in data:
usuario.email = data['email']
if 'nome' in data:
usuario.nome = data['nome']
if 'role_id' in data:
usuario.role_id = data['role_id']
if 'setor_id' in data:
usuario.setor_id = data['setor_id']
if 'is_admin' in data:
usuario.is_admin = data['is_admin']
if 'password' in data and data['password']:
usuario.set_password(data['password'])
# Salvar alterações
result = UsuarioService.salvar_usuario(usuario)
if result:
flash('Usuário atualizado com sucesso!', 'success')
return True
else:
flash('Erro ao atualizar usuário.', 'danger')
return False
@staticmethod
def toggle_status(user_id):
"""Ativa/desativa um usuário"""
if not current_user.is_admin:
return jsonify({
'success': False,
'error': 'Você não tem permissão para alterar o status de usuários.'
}), 403
usuario = UsuarioService.buscar_usuario(user_id)
if not usuario:
return jsonify({
'success': False,
'error': 'Usuário não encontrado.'
}), 404
usuario.ativo = not usuario.ativo
if UsuarioService.salvar_usuario(usuario):
return jsonify({
'success': True,
'message': f'Usuário {\'ativado\' if usuario.ativo else \'desativado\'}'
})
else:
return jsonify({
'success': False,
'error': 'Erro ao salvar alterações.'
}), 500
@staticmethod
def reset_password(user_id):
"""Reseta a senha de um usuário"""
usuario = UsuarioService.buscar_usuario(user_id)
if not usuario:
flash('Usuário não encontrado.', 'danger')
return False, None
# Gerar nova senha
new_password = secrets.token_urlsafe(8)
usuario.set_password(new_password)
# Salvar alterações
if UsuarioService.salvar_usuario(usuario):
return True, new_password
else:
flash('Erro ao resetar senha.', 'danger')
return False, None
@staticmethod
def reset_otp(user_id):
"""Reseta o OTP de um usuário"""
usuario = UsuarioService.buscar_usuario(user_id)
if not usuario:
flash('Usuário não encontrado.', 'danger')
return False
# Gerar novo OTP secret
usuario.otp_secret = pyotp.random_base32()
# Salvar alterações
if UsuarioService.salvar_usuario(usuario):
flash(f'OTP resetado com sucesso para {usuario.email}.', 'success')
return True
else:
flash('Erro ao resetar OTP.', 'danger')
return False
@staticmethod
def check_session():
"""Verifica status da sessão"""
if 'user_id' not in session:
return {'status': 'expired'}
if 'last_activity' in session:
last_activity = datetime.fromtimestamp(session['last_activity'])
now = datetime.now()
if now - last_activity > timedelta(hours=2):
# Registrar o logout por timeout
try:
user = UsuarioService.buscar_usuario(session.get('user_id'))
if user:
user.ultimo_logout = datetime.now()
user.motivo_logout = "Timeout de sessão"
UsuarioService.salvar_usuario(user)
except Exception as e:
print(f"Erro ao registrar logout por timeout: {e}")
session.clear()
return {'status': 'expired'}
return {'status': 'active'}