BREAKING CHANGES: - Sistema de permissões movido do nível de template para nível de dados - Menus sempre visíveis, controle transparente no backend - Templates nunca quebram, sempre renderizam com dados filtrados Features: - ✅ Arquitetura MVC completa implementada - ✅ Controllers com filtragem hierárquica de dados - ✅ Template helpers simplificados (user_can sempre True) - ✅ Controle de acesso baseado na hierarquia organizacional - ✅ Regra especial para tesoureiros (acesso completo) - ✅ Tratamento robusto de erros em todos os controllers Controllers implementados: - militante_controller.py - Filtragem por célula/setor/CR/CC - cota_controller.py - Controle baseado em permissões - material_controller.py - Acesso flexível por nível - pagamento_controller.py - Filtragem organizacional - auth_controller.py - Autenticação com OTP - home_controller.py - Dashboard com estatísticas - usuario_controller.py - Gestão de usuários Templates corrigidos: - listar_cotas.html - URLs corrigidas (nova_cota → cota.nova) - listar_tipos_materiais.html - Variáveis ajustadas (tipos → tipos_materiais) - base.html - Menus sempre visíveis - Diversos templates com correções de URLs e referências Services implementados: - auth_service.py - Lógica de autenticação - dashboard_service.py - Estatísticas do dashboard - cache_service.py - Integração com Redis - celula_service.py - Operações de células Models implementados: - militante_model.py - Operações de militantes - pagamento_model.py - Operações de pagamentos Documentação: - docs/permission_fixes_summary.md - Resumo completo das correções - docs/architecture_summary.md - Arquitetura MVC - docs/mvc_refactoring.md - Detalhes da refatoração - docs/permission_strategy.md - Estratégia de permissões - docs/redis_cache_setup.md - Setup do cache Redis - README.md atualizado com nova arquitetura Testes: - test_menu_navigation.py - Testes unitários de navegação - test_integration_menu.py - Testes de integração com Selenium Status dos testes: ✅ Funcionais: /, /dashboard, /pagamentos, /materiais ❌ Com problemas: /militantes, /cotas, /tipos-materiais, /admin/dashboard Hierarquia de permissões implementada: Admin → Acesso total CC → Acesso total CR → Dados do CR Setor → Dados do setor Célula → Dados da célula Próximos passos identificados: - Corrigir referências a Militante indefinido nos templates - Resolver problemas de campos inexistentes - Corrigir roteamento admin
304 lines
10 KiB
Python
304 lines
10 KiB
Python
from flask import Blueprint, request, render_template, redirect, url_for, flash, session, jsonify
|
|
from flask_login import login_user, logout_user, current_user
|
|
from datetime import datetime
|
|
from functions.database import get_db_connection, Usuario
|
|
from functions.decorators import require_login
|
|
from werkzeug.security import generate_password_hash
|
|
import pyotp
|
|
import qrcode
|
|
import base64
|
|
from io import BytesIO
|
|
|
|
auth_bp = Blueprint('auth', __name__)
|
|
|
|
@auth_bp.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
"""Rota de login"""
|
|
print(f"=== LOGIN ROUTE CALLED ===")
|
|
print(f"Method: {request.method}")
|
|
print(f"Form data: {dict(request.form)}")
|
|
|
|
if request.method == "POST":
|
|
email_or_username = request.form.get("email")
|
|
password = request.form.get("password")
|
|
otp = request.form.get("otp")
|
|
|
|
print(f"Tentativa de login - Email/Username: {email_or_username}, OTP: {otp}")
|
|
|
|
if not all([email_or_username, password]):
|
|
print("Erro: Email/usuário e senha são obrigatórios")
|
|
flash("Email/usuário e senha são obrigatórios.", "danger")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
db = get_db_connection()
|
|
try:
|
|
# Tenta encontrar o usuário por email ou username
|
|
user = db.query(Usuario).filter(
|
|
(Usuario.email == email_or_username) |
|
|
(Usuario.username == email_or_username)
|
|
).first()
|
|
|
|
print(f"Usuário encontrado: {user.username if user else 'Não encontrado'}")
|
|
|
|
if not user or not user.check_password(password):
|
|
print("Erro: Email/usuário ou senha incorretos")
|
|
flash("Email/usuário ou senha incorretos.", "danger")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
print(f"Senha válida. OTP Secret: {user.otp_secret}")
|
|
|
|
# Verificar OTP se o usuário tiver configurado
|
|
if user.otp_secret and not otp:
|
|
print("Erro: Código OTP é obrigatório")
|
|
flash("Código OTP é obrigatório para sua conta.", "danger")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
if user.otp_secret and not user.verify_otp(otp):
|
|
print(f"Erro: Código OTP inválido. Código fornecido: {otp}")
|
|
flash("Código OTP inválido.", "danger")
|
|
return redirect(url_for("auth.login"))
|
|
|
|
print("OTP válido! Fazendo login...")
|
|
|
|
# Atualizar último login
|
|
user.ultimo_login = datetime.utcnow()
|
|
db.commit()
|
|
|
|
# Fazer login e setar sessão
|
|
login_user(user)
|
|
session['user_id'] = user.id
|
|
session['username'] = user.username
|
|
session['is_admin'] = user.is_admin
|
|
print(f"Login realizado: user_id={user.id}, username={user.username}, is_admin={user.is_admin}")
|
|
|
|
# Redirecionar para home
|
|
return redirect(url_for("home.index"))
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template("login.html")
|
|
|
|
@auth_bp.route("/api/login", methods=["POST"])
|
|
def api_login():
|
|
"""Endpoint de login API sem CSRF para automação/testes"""
|
|
try:
|
|
# Verificar se é uma requisição JSON
|
|
if request.is_json:
|
|
data = request.get_json()
|
|
email_or_username = data.get("email") or data.get("username")
|
|
password = data.get("password")
|
|
otp = data.get("otp")
|
|
else:
|
|
# Fallback para form data
|
|
email_or_username = request.form.get("email") or request.form.get("username")
|
|
password = request.form.get("password")
|
|
otp = request.form.get("otp")
|
|
|
|
print(f"=== API LOGIN CALLED ===")
|
|
print(f"Email/Username: {email_or_username}")
|
|
print(f"OTP: {otp}")
|
|
|
|
# Validações básicas
|
|
if not email_or_username or not password:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Email/username e senha são obrigatórios'
|
|
}), 400
|
|
|
|
db = get_db_connection()
|
|
try:
|
|
# Buscar usuário
|
|
user = db.query(Usuario).filter(
|
|
(Usuario.email == email_or_username) |
|
|
(Usuario.username == email_or_username)
|
|
).first()
|
|
|
|
if not user:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Usuário não encontrado'
|
|
}), 401
|
|
|
|
# Verificar senha
|
|
if not user.check_password(password):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Senha incorreta'
|
|
}), 401
|
|
|
|
# Verificar OTP se configurado
|
|
if user.otp_secret:
|
|
if not otp:
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Código OTP é obrigatório para esta conta'
|
|
}), 400
|
|
|
|
if not user.verify_otp(otp):
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Código OTP inválido'
|
|
}), 401
|
|
|
|
# Atualizar último login
|
|
user.ultimo_login = datetime.utcnow()
|
|
db.commit()
|
|
|
|
# Fazer login
|
|
login_user(user)
|
|
session['user_id'] = user.id
|
|
session['username'] = user.username
|
|
session['is_admin'] = user.is_admin
|
|
|
|
print(f"API Login realizado: user_id={user.id}, username={user.username}")
|
|
|
|
# Retornar resposta de sucesso
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Login realizado com sucesso',
|
|
'user': {
|
|
'id': user.id,
|
|
'username': user.username,
|
|
'email': user.email,
|
|
'nome': user.nome,
|
|
'is_admin': user.is_admin,
|
|
'ultimo_login': user.ultimo_login.isoformat() if user.ultimo_login else None
|
|
},
|
|
'session_id': session.get('_id')
|
|
})
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
except Exception as e:
|
|
print(f"Erro no API login: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Erro interno do servidor'
|
|
}), 500
|
|
|
|
@auth_bp.route("/api/logout", methods=["POST"])
|
|
def api_logout():
|
|
"""Endpoint de logout API"""
|
|
try:
|
|
if current_user.is_authenticated:
|
|
db = get_db_connection()
|
|
try:
|
|
user = current_user
|
|
user.logout()
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
logout_user()
|
|
|
|
return jsonify({
|
|
'success': True,
|
|
'message': 'Logout realizado com sucesso'
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"Erro no API logout: {e}")
|
|
return jsonify({
|
|
'success': False,
|
|
'error': 'Erro interno do servidor'
|
|
}), 500
|
|
|
|
@auth_bp.route("/api/status")
|
|
def api_status():
|
|
"""Endpoint para verificar status da autenticação"""
|
|
if current_user.is_authenticated:
|
|
return jsonify({
|
|
'authenticated': True,
|
|
'user': {
|
|
'id': current_user.id,
|
|
'username': current_user.username,
|
|
'email': current_user.email,
|
|
'nome': current_user.nome,
|
|
'is_admin': current_user.is_admin
|
|
}
|
|
})
|
|
else:
|
|
return jsonify({
|
|
'authenticated': False
|
|
})
|
|
|
|
@auth_bp.route("/logout")
|
|
@require_login
|
|
def logout():
|
|
db = get_db_connection()
|
|
try:
|
|
user = current_user
|
|
if user:
|
|
user.logout()
|
|
db.commit()
|
|
logout_user()
|
|
finally:
|
|
db.close()
|
|
flash('Logout realizado com sucesso!', 'success')
|
|
return redirect(url_for('auth.login'))
|
|
|
|
@auth_bp.route("/alterar_senha", methods=["GET", "POST"])
|
|
@require_login
|
|
def alterar_senha():
|
|
"""Rota para alterar a senha do usuário"""
|
|
if request.method == "POST":
|
|
senha_atual = request.form.get("senha_atual")
|
|
nova_senha = request.form.get("nova_senha")
|
|
confirmar_senha = request.form.get("confirmar_senha")
|
|
|
|
if not all([senha_atual, nova_senha, confirmar_senha]):
|
|
flash("Todos os campos são obrigatórios.", "error")
|
|
return redirect(url_for("auth.alterar_senha"))
|
|
|
|
if nova_senha != confirmar_senha:
|
|
flash("As senhas não coincidem.", "error")
|
|
return redirect(url_for("auth.alterar_senha"))
|
|
|
|
db = get_db_connection()
|
|
try:
|
|
user = db.query(Usuario).get(current_user.id)
|
|
if not user.check_password(senha_atual):
|
|
flash("Senha atual incorreta.", "error")
|
|
return redirect(url_for("auth.alterar_senha"))
|
|
|
|
user.password_hash = generate_password_hash(nova_senha)
|
|
db.commit()
|
|
flash("Senha alterada com sucesso!", "success")
|
|
return redirect(url_for("home.index"))
|
|
finally:
|
|
db.close()
|
|
|
|
return render_template("alterar_senha.html")
|
|
|
|
@auth_bp.route("/qr/<token>")
|
|
def get_qr_code(token):
|
|
"""Gera QR code para configuração OTP"""
|
|
db = get_db_connection()
|
|
try:
|
|
militante = db.query(Militante).filter_by(temp_token=token).first()
|
|
if not militante or militante.temp_token_expiry < datetime.now():
|
|
flash('Token inválido ou expirado.', 'danger')
|
|
return redirect(url_for('auth.login'))
|
|
|
|
qr_code = generate_qr_code(militante)
|
|
return render_template('mostrar_qr_code.html', qr_code=qr_code)
|
|
finally:
|
|
db.close()
|
|
|
|
def generate_qr_code(user):
|
|
"""Gera um QR code para o usuário"""
|
|
if not user.otp_secret:
|
|
user.otp_secret = pyotp.random_base32()
|
|
|
|
totp = pyotp.TOTP(user.otp_secret)
|
|
qr = qrcode.QRCode(version=1, box_size=10, border=5)
|
|
qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles"))
|
|
qr.make(fit=True)
|
|
|
|
img = qr.make_image(fill_color="black", back_color="white")
|
|
buffer = BytesIO()
|
|
img.save(buffer, format="PNG")
|
|
qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
|
|
|
return qr_code |