- padronizando o nome de get_db_connection e session para get_db_session, para não confundir com session do Flask ou sessoes web
- corrigindo potenciais erros
-- has_permission nao consegue com lazy load carregar permission depois de load_user fechar a conexao, entao joinedLoad com Permission antes de fechar
-- db.rollback não existe caso db = get_db_session() apareça muito depois dentro do try, padronizando antes de try
--- comparar role por nivel (Role.SECRETARIO_GERAL) e nao por nome ("Secretario Geral")
- unificacao de get_otp_qr_code
- mudança de nowutc() para now(UTC) conforme novo padrão
254 lines
10 KiB
Python
254 lines
10 KiB
Python
from functions.database import get_db_session, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento
|
|
from sqlalchemy import func
|
|
from sqlalchemy.orm import joinedload
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Any
|
|
from services.cache_service import cache_service, cached, CacheKeys, invalidate_cache_pattern
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DashboardService:
|
|
"""Service for dashboard data aggregation with caching"""
|
|
|
|
@staticmethod
|
|
@cached(expire=300, key_prefix="dashboard") # Cache for 5 minutes
|
|
def get_dashboard_stats() -> Dict[str, Any]:
|
|
"""Get dashboard statistics with caching"""
|
|
db = get_db_session()
|
|
try:
|
|
# Get cached stats first
|
|
cache_key = CacheKeys.DASHBOARD_STATS
|
|
cached_stats = cache_service.get(cache_key)
|
|
if cached_stats:
|
|
logger.debug("Using cached dashboard stats")
|
|
return cached_stats
|
|
|
|
# Calculate fresh stats
|
|
stats = DashboardService._calculate_stats(db)
|
|
|
|
# Cache the results
|
|
cache_service.set(cache_key, stats, 300) # 5 minutes
|
|
logger.debug("Cached fresh dashboard stats")
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting dashboard stats: {e}")
|
|
return DashboardService._get_default_stats()
|
|
finally:
|
|
db.close()
|
|
|
|
@staticmethod
|
|
def _calculate_stats(db) -> Dict[str, Any]:
|
|
"""Calculate dashboard statistics"""
|
|
try:
|
|
# Total militantes
|
|
total_militantes = db.query(func.count(Militante.id)).scalar()
|
|
|
|
# Total cotas (soma dos valores)
|
|
total_cotas_result = db.query(func.sum(CotaMensal.valor_novo)).scalar()
|
|
total_cotas = f"{total_cotas_result:.2f}" if total_cotas_result else "0.00"
|
|
|
|
# Total de materiais vendidos
|
|
total_materiais = db.query(func.count(MaterialVendido.id)).scalar()
|
|
|
|
# Total de assinaturas ativas
|
|
total_assinaturas = db.query(func.count(AssinaturaAnual.id)).scalar()
|
|
|
|
# Últimos militantes cadastrados (limit 5) - eager load emails
|
|
militantes_query = db.query(Militante).options(
|
|
joinedload(Militante.emails)
|
|
).order_by(Militante.id.desc()).limit(5).all()
|
|
|
|
# Convert militantes to dictionaries to avoid lazy loading issues
|
|
ultimos_militantes = []
|
|
for militante in militantes_query:
|
|
militante_dict = {
|
|
'id': militante.id,
|
|
'nome': militante.nome,
|
|
'emails': [{'endereco_email': email.endereco_email} for email in militante.emails]
|
|
}
|
|
ultimos_militantes.append(militante_dict)
|
|
|
|
# Últimos pagamentos (limit 5) - eager load militante
|
|
pagamentos_query = db.query(Pagamento).options(
|
|
joinedload(Pagamento.militante)
|
|
).order_by(Pagamento.data_pagamento.desc()).limit(5).all()
|
|
|
|
# Convert pagamentos to dictionaries to avoid lazy loading issues
|
|
ultimos_pagamentos = []
|
|
for pagamento in pagamentos_query:
|
|
pagamento_dict = {
|
|
'id': pagamento.id,
|
|
'valor': pagamento.valor,
|
|
'data_pagamento': pagamento.data_pagamento,
|
|
'militante': {
|
|
'id': pagamento.militante.id,
|
|
'nome': pagamento.militante.nome
|
|
}
|
|
}
|
|
ultimos_pagamentos.append(pagamento_dict)
|
|
|
|
# Estatísticas por período
|
|
hoje = datetime.now().date()
|
|
inicio_mes = hoje.replace(day=1)
|
|
|
|
# Militantes cadastrados este mês
|
|
militantes_mes = db.query(func.count(Militante.id)).filter(
|
|
Militante.id >= 1 # Assuming ID is auto-increment
|
|
).scalar()
|
|
|
|
# Pagamentos este mês
|
|
pagamentos_mes = db.query(func.sum(Pagamento.valor)).filter(
|
|
Pagamento.data_pagamento >= inicio_mes
|
|
).scalar()
|
|
total_pagamentos_mes = f"{pagamentos_mes:.2f}" if pagamentos_mes else "0.00"
|
|
|
|
return {
|
|
'total_militantes': total_militantes,
|
|
'total_cotas': total_cotas,
|
|
'total_materiais': total_materiais,
|
|
'total_assinaturas': total_assinaturas,
|
|
'ultimos_militantes': ultimos_militantes,
|
|
'ultimos_pagamentos': ultimos_pagamentos,
|
|
'militantes_mes': militantes_mes,
|
|
'pagamentos_mes': total_pagamentos_mes,
|
|
'cache_timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calculating dashboard stats: {e}")
|
|
return DashboardService._get_default_stats()
|
|
|
|
@staticmethod
|
|
def _get_default_stats() -> Dict[str, Any]:
|
|
"""Get default statistics when calculation fails"""
|
|
return {
|
|
'total_militantes': 0,
|
|
'total_cotas': "0.00",
|
|
'total_materiais': 0,
|
|
'total_assinaturas': 0,
|
|
'ultimos_militantes': [],
|
|
'ultimos_pagamentos': [],
|
|
'militantes_mes': 0,
|
|
'pagamentos_mes': "0.00",
|
|
'cache_timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
@staticmethod
|
|
@invalidate_cache_pattern("dashboard:*")
|
|
def invalidate_dashboard_cache():
|
|
"""Invalidate dashboard cache when data changes"""
|
|
logger.info("Dashboard cache invalidated")
|
|
|
|
@staticmethod
|
|
@cached(expire=600, key_prefix="dashboard") # Cache for 10 minutes
|
|
def get_militante_stats() -> Dict[str, Any]:
|
|
"""Get militante-specific statistics"""
|
|
db = get_db_session()
|
|
try:
|
|
# Militantes por estado
|
|
estados = db.query(Militante.estado, func.count(Militante.id)).group_by(Militante.estado).all()
|
|
|
|
# Militantes por responsabilidade
|
|
responsabilidades = {}
|
|
militantes = db.query(Militante).all()
|
|
|
|
for militante in militantes:
|
|
for resp in militante.get_responsabilidades():
|
|
responsabilidades[resp] = responsabilidades.get(resp, 0) + 1
|
|
|
|
return {
|
|
'estados': dict(estados),
|
|
'responsabilidades': responsabilidades,
|
|
'cache_timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting militante stats: {e}")
|
|
return {'estados': {}, 'responsabilidades': {}, 'cache_timestamp': datetime.now().isoformat()}
|
|
finally:
|
|
db.close()
|
|
|
|
@staticmethod
|
|
@cached(expire=300, key_prefix="dashboard")
|
|
def get_financial_stats() -> Dict[str, Any]:
|
|
"""Get financial statistics"""
|
|
db = get_db_session()
|
|
try:
|
|
# Total de pagamentos
|
|
total_pagamentos = db.query(func.sum(Pagamento.valor)).scalar()
|
|
|
|
# Pagamentos por mês (últimos 6 meses)
|
|
hoje = datetime.now().date()
|
|
stats_mensais = []
|
|
|
|
for i in range(6):
|
|
inicio_mes = hoje.replace(day=1) - timedelta(days=30*i)
|
|
fim_mes = inicio_mes.replace(day=28) + timedelta(days=4)
|
|
fim_mes = fim_mes.replace(day=1) - timedelta(days=1)
|
|
|
|
valor_mes = db.query(func.sum(Pagamento.valor)).filter(
|
|
Pagamento.data_pagamento >= inicio_mes,
|
|
Pagamento.data_pagamento <= fim_mes
|
|
).scalar()
|
|
|
|
stats_mensais.append({
|
|
'mes': inicio_mes.strftime('%Y-%m'),
|
|
'valor': float(valor_mes) if valor_mes else 0.0
|
|
})
|
|
|
|
return {
|
|
'total_pagamentos': float(total_pagamentos) if total_pagamentos else 0.0,
|
|
'stats_mensais': stats_mensais,
|
|
'cache_timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting financial stats: {e}")
|
|
return {
|
|
'total_pagamentos': 0.0,
|
|
'stats_mensais': [],
|
|
'cache_timestamp': datetime.now().isoformat()
|
|
}
|
|
finally:
|
|
db.close()
|
|
|
|
@staticmethod
|
|
def obter_ultimos_militantes(limite: int = 5) -> List[Militante]:
|
|
"""Obtém os últimos militantes cadastrados"""
|
|
db = get_db_session()
|
|
try:
|
|
return db.query(Militante).order_by(Militante.id.desc()).limit(limite).all()
|
|
finally:
|
|
db.close()
|
|
|
|
@staticmethod
|
|
def obter_ultimos_pagamentos(limite: int = 5) -> List[Pagamento]:
|
|
"""Obtém os últimos pagamentos realizados"""
|
|
db = get_db_session()
|
|
try:
|
|
return db.query(Pagamento).join(Militante).order_by(Pagamento.data_pagamento.desc()).limit(limite).all()
|
|
finally:
|
|
db.close()
|
|
|
|
@staticmethod
|
|
def obter_tipos_pagamento() -> List[TipoPagamento]:
|
|
"""Obtém todos os tipos de pagamento"""
|
|
db = get_db_session()
|
|
try:
|
|
return db.query(TipoPagamento).all()
|
|
finally:
|
|
db.close()
|
|
|
|
@staticmethod
|
|
def obter_dados_dashboard() -> Dict:
|
|
"""Obtém todos os dados necessários para o dashboard"""
|
|
return {
|
|
'estatisticas': DashboardService.get_dashboard_stats(),
|
|
'ultimos_militantes': DashboardService.obter_ultimos_militantes(),
|
|
'ultimos_pagamentos': DashboardService.obter_ultimos_pagamentos(),
|
|
'tipos_pagamento': DashboardService.obter_tipos_pagamento(),
|
|
'data_atual': datetime.now().strftime("%d/%m/%Y")
|
|
} |