feat: Implementar arquitetura de permissões no nível de dados

BREAKING CHANGES:
- Sistema de permissões movido do nível de template para nível de dados
- Menus sempre visíveis, controle transparente no backend
- Templates nunca quebram, sempre renderizam com dados filtrados

Features:
-  Arquitetura MVC completa implementada
-  Controllers com filtragem hierárquica de dados
-  Template helpers simplificados (user_can sempre True)
-  Controle de acesso baseado na hierarquia organizacional
-  Regra especial para tesoureiros (acesso completo)
-  Tratamento robusto de erros em todos os controllers

Controllers implementados:
- militante_controller.py - Filtragem por célula/setor/CR/CC
- cota_controller.py - Controle baseado em permissões
- material_controller.py - Acesso flexível por nível
- pagamento_controller.py - Filtragem organizacional
- auth_controller.py - Autenticação com OTP
- home_controller.py - Dashboard com estatísticas
- usuario_controller.py - Gestão de usuários

Templates corrigidos:
- listar_cotas.html - URLs corrigidas (nova_cota → cota.nova)
- listar_tipos_materiais.html - Variáveis ajustadas (tipos → tipos_materiais)
- base.html - Menus sempre visíveis
- Diversos templates com correções de URLs e referências

Services implementados:
- auth_service.py - Lógica de autenticação
- dashboard_service.py - Estatísticas do dashboard
- cache_service.py - Integração com Redis
- celula_service.py - Operações de células

Models implementados:
- militante_model.py - Operações de militantes
- pagamento_model.py - Operações de pagamentos

Documentação:
- docs/permission_fixes_summary.md - Resumo completo das correções
- docs/architecture_summary.md - Arquitetura MVC
- docs/mvc_refactoring.md - Detalhes da refatoração
- docs/permission_strategy.md - Estratégia de permissões
- docs/redis_cache_setup.md - Setup do cache Redis
- README.md atualizado com nova arquitetura

Testes:
- test_menu_navigation.py - Testes unitários de navegação
- test_integration_menu.py - Testes de integração com Selenium

Status dos testes:
 Funcionais: /, /dashboard, /pagamentos, /materiais
 Com problemas: /militantes, /cotas, /tipos-materiais, /admin/dashboard

Hierarquia de permissões implementada:
Admin → Acesso total
CC → Acesso total
CR → Dados do CR
Setor → Dados do setor
Célula → Dados da célula

Próximos passos identificados:
- Corrigir referências a Militante indefinido nos templates
- Resolver problemas de campos inexistentes
- Corrigir roteamento admin
This commit is contained in:
LS
2025-07-01 13:42:56 -03:00
parent d283bced4b
commit 7399e0000e
62 changed files with 5897 additions and 1983 deletions

1
services/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Services package

157
services/auth_service.py Normal file
View File

@@ -0,0 +1,157 @@
from functions.database import get_db_connection, Usuario
from flask_login import login_user, logout_user
from datetime import datetime
from typing import Dict, Optional
import pyotp
import qrcode
import base64
from io import BytesIO
class AuthService:
"""Service para operações de autenticação"""
@staticmethod
def autenticar_usuario(email_or_username: str, password: str, otp: str = None) -> Dict:
"""Autentica um usuário"""
db = get_db_connection()
try:
# Tenta encontrar o usuário por email ou username
user = db.query(Usuario).filter(
(Usuario.email == email_or_username) |
(Usuario.username == email_or_username)
).first()
if not user or not user.check_password(password):
return {
'status': 'error',
'message': 'Email/usuário ou senha incorretos.'
}
# Verificar OTP se o usuário tiver configurado
if user.otp_secret and not otp:
return {
'status': 'error',
'message': 'Código OTP é obrigatório para sua conta.'
}
if user.otp_secret and not user.verify_otp(otp):
return {
'status': 'error',
'message': 'Código OTP inválido.'
}
# Atualizar último login
user.ultimo_login = datetime.utcnow()
db.commit()
# Fazer login
login_user(user)
return {
'status': 'success',
'user': user
}
except Exception as e:
db.rollback()
return {
'status': 'error',
'message': f'Erro na autenticação: {str(e)}'
}
finally:
db.close()
@staticmethod
def desautenticar_usuario(user) -> Dict:
"""Desautentica um usuário"""
db = get_db_connection()
try:
if user:
user.logout()
db.commit()
logout_user()
return {
'status': 'success',
'message': 'Logout realizado com sucesso!'
}
except Exception as e:
db.rollback()
return {
'status': 'error',
'message': f'Erro no logout: {str(e)}'
}
finally:
db.close()
@staticmethod
def alterar_senha(user_id: int, senha_atual: str, nova_senha: str) -> Dict:
"""Altera a senha de um usuário"""
db = get_db_connection()
try:
user = db.query(Usuario).get(user_id)
if not user:
return {
'status': 'error',
'message': 'Usuário não encontrado.'
}
if not user.check_password(senha_atual):
return {
'status': 'error',
'message': 'Senha atual incorreta.'
}
user.set_password(nova_senha)
db.commit()
return {
'status': 'success',
'message': 'Senha alterada com sucesso!'
}
except Exception as e:
db.rollback()
return {
'status': 'error',
'message': f'Erro ao alterar senha: {str(e)}'
}
finally:
db.close()
@staticmethod
def gerar_qr_code(user) -> str:
"""Gera um QR code para o usuário"""
if not user.otp_secret:
user.otp_secret = pyotp.random_base32()
totp = pyotp.TOTP(user.otp_secret)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles"))
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8')
return qr_code
@staticmethod
def verificar_sessao(user) -> Dict:
"""Verifica se a sessão do usuário ainda é válida"""
if not user.is_authenticated:
return {
'valid': False,
'message': 'Usuário não autenticado'
}
if user.is_session_expired():
return {
'valid': False,
'message': 'Sessão expirada'
}
return {
'valid': True
}

268
services/cache_service.py Normal file
View File

@@ -0,0 +1,268 @@
import redis
import json
import pickle
from typing import Any, Optional, Union, Dict, List
from datetime import timedelta
import os
import logging
from functools import wraps
import hashlib
logger = logging.getLogger(__name__)
class CacheService:
"""Service for Redis caching operations"""
def __init__(self, redis_url: str = None):
"""Initialize Redis connection"""
self.redis_url = redis_url or os.getenv('REDIS_URL', 'redis://localhost:6379/0')
self.redis = None
self._connect()
def _connect(self):
"""Establish Redis connection"""
try:
self.redis = redis.from_url(self.redis_url, decode_responses=False)
# Test connection
self.redis.ping()
logger.info("Redis connection established successfully")
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
self.redis = None
def _is_connected(self) -> bool:
"""Check if Redis is connected"""
if not self.redis:
return False
try:
self.redis.ping()
return True
except:
return False
def get(self, key: str, default: Any = None) -> Any:
"""Get value from cache"""
if not self._is_connected():
return default
try:
value = self.redis.get(key)
if value is None:
return default
return pickle.loads(value)
except Exception as e:
logger.error(f"Error getting cache key {key}: {e}")
return default
def set(self, key: str, value: Any, expire: int = 3600) -> bool:
"""Set value in cache with expiration"""
if not self._is_connected():
return False
try:
serialized_value = pickle.dumps(value)
return self.redis.setex(key, expire, serialized_value)
except Exception as e:
logger.error(f"Error setting cache key {key}: {e}")
return False
def delete(self, key: str) -> bool:
"""Delete key from cache"""
if not self._is_connected():
return False
try:
return bool(self.redis.delete(key))
except Exception as e:
logger.error(f"Error deleting cache key {key}: {e}")
return False
def exists(self, key: str) -> bool:
"""Check if key exists in cache"""
if not self._is_connected():
return False
try:
return bool(self.redis.exists(key))
except Exception as e:
logger.error(f"Error checking cache key {key}: {e}")
return False
def expire(self, key: str, seconds: int) -> bool:
"""Set expiration for key"""
if not self._is_connected():
return False
try:
return bool(self.redis.expire(key, seconds))
except Exception as e:
logger.error(f"Error setting expiration for cache key {key}: {e}")
return False
def ttl(self, key: str) -> int:
"""Get time to live for key"""
if not self._is_connected():
return -1
try:
return self.redis.ttl(key)
except Exception as e:
logger.error(f"Error getting TTL for cache key {key}: {e}")
return -1
def clear_pattern(self, pattern: str) -> int:
"""Clear all keys matching pattern"""
if not self._is_connected():
return 0
try:
keys = self.redis.keys(pattern)
if keys:
return self.redis.delete(*keys)
return 0
except Exception as e:
logger.error(f"Error clearing cache pattern {pattern}: {e}")
return 0
def clear_all(self) -> bool:
"""Clear all cache"""
if not self._is_connected():
return False
try:
self.redis.flushdb()
return True
except Exception as e:
logger.error(f"Error clearing all cache: {e}")
return False
def get_many(self, keys: List[str]) -> Dict[str, Any]:
"""Get multiple values from cache"""
if not self._is_connected():
return {}
try:
values = self.redis.mget(keys)
result = {}
for key, value in zip(keys, values):
if value is not None:
result[key] = pickle.loads(value)
return result
except Exception as e:
logger.error(f"Error getting multiple cache keys: {e}")
return {}
def set_many(self, data: Dict[str, Any], expire: int = 3600) -> bool:
"""Set multiple values in cache"""
if not self._is_connected():
return False
try:
pipeline = self.redis.pipeline()
for key, value in data.items():
serialized_value = pickle.dumps(value)
pipeline.setex(key, expire, serialized_value)
pipeline.execute()
return True
except Exception as e:
logger.error(f"Error setting multiple cache keys: {e}")
return False
def increment(self, key: str, amount: int = 1) -> Optional[int]:
"""Increment counter in cache"""
if not self._is_connected():
return None
try:
return self.redis.incr(key, amount)
except Exception as e:
logger.error(f"Error incrementing cache key {key}: {e}")
return None
def decrement(self, key: str, amount: int = 1) -> Optional[int]:
"""Decrement counter in cache"""
if not self._is_connected():
return None
try:
return self.redis.decr(key, amount)
except Exception as e:
logger.error(f"Error decrementing cache key {key}: {e}")
return None
# Global cache instance
cache_service = CacheService()
def cache_key_generator(*args, **kwargs) -> str:
"""Generate cache key from function arguments"""
# Create a hash of the arguments
key_data = str(args) + str(sorted(kwargs.items()))
return hashlib.md5(key_data.encode()).hexdigest()
def cached(expire: int = 3600, key_prefix: str = ""):
"""Decorator for caching function results"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Generate cache key
func_key = f"{key_prefix}:{func.__name__}:{cache_key_generator(*args, **kwargs)}"
# Try to get from cache
cached_result = cache_service.get(func_key)
if cached_result is not None:
logger.debug(f"Cache hit for {func_key}")
return cached_result
# Execute function and cache result
result = func(*args, **kwargs)
cache_service.set(func_key, result, expire)
logger.debug(f"Cache miss for {func_key}, stored result")
return result
return wrapper
return decorator
def invalidate_cache_pattern(pattern: str):
"""Decorator to invalidate cache after function execution"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
cache_service.clear_pattern(pattern)
logger.debug(f"Invalidated cache pattern: {pattern}")
return result
return wrapper
return decorator
# Cache key constants
class CacheKeys:
"""Constants for cache keys"""
MILITANTE_LIST = "militantes:list"
MILITANTE_DETAIL = "militante:detail:{}"
PAGAMENTO_LIST = "pagamentos:list"
PAGAMENTO_DETAIL = "pagamento:detail:{}"
COTA_LIST = "cotas:list"
COTA_DETAIL = "cota:detail:{}"
DASHBOARD_STATS = "dashboard:stats"
USER_SESSION = "user:session:{}"
API_RESPONSE = "api:response:{}"
@staticmethod
def militante_detail(militante_id: int) -> str:
return CacheKeys.MILITANTE_DETAIL.format(militante_id)
@staticmethod
def pagamento_detail(pagamento_id: int) -> str:
return CacheKeys.PAGAMENTO_DETAIL.format(pagamento_id)
@staticmethod
def cota_detail(cota_id: int) -> str:
return CacheKeys.COTA_DETAIL.format(cota_id)
@staticmethod
def user_session(user_id: int) -> str:
return CacheKeys.USER_SESSION.format(user_id)
@staticmethod
def api_response(endpoint: str) -> str:
return CacheKeys.API_RESPONSE.format(endpoint)

View File

@@ -0,0 +1,78 @@
from services.database_service import DatabaseService
from models.entities.celula import Celula
class CelulaService:
"""Service for Celula operations"""
@staticmethod
def get_all_celulas():
"""Get all celulas from the database"""
db = DatabaseService.get_db_connection()
try:
celulas = db.query(Celula).all()
return celulas
finally:
db.close()
@staticmethod
def get_celula_by_id(celula_id):
"""Get a celula by its ID"""
db = DatabaseService.get_db_connection()
try:
celula = db.query(Celula).get(celula_id)
return celula
finally:
db.close()
@staticmethod
def create_celula(data):
"""Create a new celula"""
db = DatabaseService.get_db_connection()
try:
celula = Celula(**data)
db.add(celula)
db.commit()
return celula
except Exception as e:
db.rollback()
raise e
finally:
db.close()
@staticmethod
def update_celula(celula_id, data):
"""Update an existing celula"""
db = DatabaseService.get_db_connection()
try:
celula = db.query(Celula).get(celula_id)
if not celula:
return None
for key, value in data.items():
setattr(celula, key, value)
db.commit()
return celula
except Exception as e:
db.rollback()
raise e
finally:
db.close()
@staticmethod
def delete_celula(celula_id):
"""Delete a celula"""
db = DatabaseService.get_db_connection()
try:
celula = db.query(Celula).get(celula_id)
if not celula:
return False
db.delete(celula)
db.commit()
return True
except Exception as e:
db.rollback()
raise e
finally:
db.close()

View File

@@ -0,0 +1,254 @@
from functions.database import get_db_connection, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento
from sqlalchemy import func
from sqlalchemy.orm import joinedload
from datetime import datetime, timedelta
from typing import Dict, List, Any
from services.cache_service import cache_service, cached, CacheKeys, invalidate_cache_pattern
import logging
logger = logging.getLogger(__name__)
class DashboardService:
"""Service for dashboard data aggregation with caching"""
@staticmethod
@cached(expire=300, key_prefix="dashboard") # Cache for 5 minutes
def get_dashboard_stats() -> Dict[str, Any]:
"""Get dashboard statistics with caching"""
db = get_db_connection()
try:
# Get cached stats first
cache_key = CacheKeys.DASHBOARD_STATS
cached_stats = cache_service.get(cache_key)
if cached_stats:
logger.debug("Using cached dashboard stats")
return cached_stats
# Calculate fresh stats
stats = DashboardService._calculate_stats(db)
# Cache the results
cache_service.set(cache_key, stats, 300) # 5 minutes
logger.debug("Cached fresh dashboard stats")
return stats
except Exception as e:
logger.error(f"Error getting dashboard stats: {e}")
return DashboardService._get_default_stats()
finally:
db.close()
@staticmethod
def _calculate_stats(db) -> Dict[str, Any]:
"""Calculate dashboard statistics"""
try:
# Total militantes
total_militantes = db.query(func.count(Militante.id)).scalar()
# Total cotas (soma dos valores)
total_cotas_result = db.query(func.sum(CotaMensal.valor_novo)).scalar()
total_cotas = f"{total_cotas_result:.2f}" if total_cotas_result else "0.00"
# Total de materiais vendidos
total_materiais = db.query(func.count(MaterialVendido.id)).scalar()
# Total de assinaturas ativas
total_assinaturas = db.query(func.count(AssinaturaAnual.id)).scalar()
# Últimos militantes cadastrados (limit 5) - eager load emails
militantes_query = db.query(Militante).options(
joinedload(Militante.emails)
).order_by(Militante.id.desc()).limit(5).all()
# Convert militantes to dictionaries to avoid lazy loading issues
ultimos_militantes = []
for militante in militantes_query:
militante_dict = {
'id': militante.id,
'nome': militante.nome,
'emails': [{'endereco_email': email.endereco_email} for email in militante.emails]
}
ultimos_militantes.append(militante_dict)
# Últimos pagamentos (limit 5) - eager load militante
pagamentos_query = db.query(Pagamento).options(
joinedload(Pagamento.militante)
).order_by(Pagamento.data_pagamento.desc()).limit(5).all()
# Convert pagamentos to dictionaries to avoid lazy loading issues
ultimos_pagamentos = []
for pagamento in pagamentos_query:
pagamento_dict = {
'id': pagamento.id,
'valor': pagamento.valor,
'data_pagamento': pagamento.data_pagamento,
'militante': {
'id': pagamento.militante.id,
'nome': pagamento.militante.nome
}
}
ultimos_pagamentos.append(pagamento_dict)
# Estatísticas por período
hoje = datetime.now().date()
inicio_mes = hoje.replace(day=1)
# Militantes cadastrados este mês
militantes_mes = db.query(func.count(Militante.id)).filter(
Militante.id >= 1 # Assuming ID is auto-increment
).scalar()
# Pagamentos este mês
pagamentos_mes = db.query(func.sum(Pagamento.valor)).filter(
Pagamento.data_pagamento >= inicio_mes
).scalar()
total_pagamentos_mes = f"{pagamentos_mes:.2f}" if pagamentos_mes else "0.00"
return {
'total_militantes': total_militantes,
'total_cotas': total_cotas,
'total_materiais': total_materiais,
'total_assinaturas': total_assinaturas,
'ultimos_militantes': ultimos_militantes,
'ultimos_pagamentos': ultimos_pagamentos,
'militantes_mes': militantes_mes,
'pagamentos_mes': total_pagamentos_mes,
'cache_timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error calculating dashboard stats: {e}")
return DashboardService._get_default_stats()
@staticmethod
def _get_default_stats() -> Dict[str, Any]:
"""Get default statistics when calculation fails"""
return {
'total_militantes': 0,
'total_cotas': "0.00",
'total_materiais': 0,
'total_assinaturas': 0,
'ultimos_militantes': [],
'ultimos_pagamentos': [],
'militantes_mes': 0,
'pagamentos_mes': "0.00",
'cache_timestamp': datetime.now().isoformat()
}
@staticmethod
@invalidate_cache_pattern("dashboard:*")
def invalidate_dashboard_cache():
"""Invalidate dashboard cache when data changes"""
logger.info("Dashboard cache invalidated")
@staticmethod
@cached(expire=600, key_prefix="dashboard") # Cache for 10 minutes
def get_militante_stats() -> Dict[str, Any]:
"""Get militante-specific statistics"""
db = get_db_connection()
try:
# Militantes por estado
estados = db.query(Militante.estado, func.count(Militante.id)).group_by(Militante.estado).all()
# Militantes por responsabilidade
responsabilidades = {}
militantes = db.query(Militante).all()
for militante in militantes:
for resp in militante.get_responsabilidades():
responsabilidades[resp] = responsabilidades.get(resp, 0) + 1
return {
'estados': dict(estados),
'responsabilidades': responsabilidades,
'cache_timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting militante stats: {e}")
return {'estados': {}, 'responsabilidades': {}, 'cache_timestamp': datetime.now().isoformat()}
finally:
db.close()
@staticmethod
@cached(expire=300, key_prefix="dashboard")
def get_financial_stats() -> Dict[str, Any]:
"""Get financial statistics"""
db = get_db_connection()
try:
# Total de pagamentos
total_pagamentos = db.query(func.sum(Pagamento.valor)).scalar()
# Pagamentos por mês (últimos 6 meses)
hoje = datetime.now().date()
stats_mensais = []
for i in range(6):
inicio_mes = hoje.replace(day=1) - timedelta(days=30*i)
fim_mes = inicio_mes.replace(day=28) + timedelta(days=4)
fim_mes = fim_mes.replace(day=1) - timedelta(days=1)
valor_mes = db.query(func.sum(Pagamento.valor)).filter(
Pagamento.data_pagamento >= inicio_mes,
Pagamento.data_pagamento <= fim_mes
).scalar()
stats_mensais.append({
'mes': inicio_mes.strftime('%Y-%m'),
'valor': float(valor_mes) if valor_mes else 0.0
})
return {
'total_pagamentos': float(total_pagamentos) if total_pagamentos else 0.0,
'stats_mensais': stats_mensais,
'cache_timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error getting financial stats: {e}")
return {
'total_pagamentos': 0.0,
'stats_mensais': [],
'cache_timestamp': datetime.now().isoformat()
}
finally:
db.close()
@staticmethod
def obter_ultimos_militantes(limite: int = 5) -> List[Militante]:
"""Obtém os últimos militantes cadastrados"""
db = get_db_connection()
try:
return db.query(Militante).order_by(Militante.id.desc()).limit(limite).all()
finally:
db.close()
@staticmethod
def obter_ultimos_pagamentos(limite: int = 5) -> List[Pagamento]:
"""Obtém os últimos pagamentos realizados"""
db = get_db_connection()
try:
return db.query(Pagamento).join(Militante).order_by(Pagamento.data_pagamento.desc()).limit(limite).all()
finally:
db.close()
@staticmethod
def obter_tipos_pagamento() -> List[TipoPagamento]:
"""Obtém todos os tipos de pagamento"""
db = get_db_connection()
try:
return db.query(TipoPagamento).all()
finally:
db.close()
@staticmethod
def obter_dados_dashboard() -> Dict:
"""Obtém todos os dados necessários para o dashboard"""
return {
'estatisticas': DashboardService.get_dashboard_stats(),
'ultimos_militantes': DashboardService.obter_ultimos_militantes(),
'ultimos_pagamentos': DashboardService.obter_ultimos_pagamentos(),
'tipos_pagamento': DashboardService.obter_tipos_pagamento(),
'data_atual': datetime.now().strftime("%d/%m/%Y")
}