from functions.database import get_db_connection, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento from sqlalchemy import func from sqlalchemy.orm import joinedload from datetime import datetime, timedelta from typing import Dict, List, Any from services.cache_service import cache_service, cached, CacheKeys, invalidate_cache_pattern import logging logger = logging.getLogger(__name__) class DashboardService: """Service for dashboard data aggregation with caching""" @staticmethod @cached(expire=300, key_prefix="dashboard") # Cache for 5 minutes def get_dashboard_stats() -> Dict[str, Any]: """Get dashboard statistics with caching""" db = get_db_connection() try: # Get cached stats first cache_key = CacheKeys.DASHBOARD_STATS cached_stats = cache_service.get(cache_key) if cached_stats: logger.debug("Using cached dashboard stats") return cached_stats # Calculate fresh stats stats = DashboardService._calculate_stats(db) # Cache the results cache_service.set(cache_key, stats, 300) # 5 minutes logger.debug("Cached fresh dashboard stats") return stats except Exception as e: logger.error(f"Error getting dashboard stats: {e}") return DashboardService._get_default_stats() finally: db.close() @staticmethod def _calculate_stats(db) -> Dict[str, Any]: """Calculate dashboard statistics""" try: # Total militantes total_militantes = db.query(func.count(Militante.id)).scalar() # Total cotas (soma dos valores) total_cotas_result = db.query(func.sum(CotaMensal.valor_novo)).scalar() total_cotas = f"{total_cotas_result:.2f}" if total_cotas_result else "0.00" # Total de materiais vendidos total_materiais = db.query(func.count(MaterialVendido.id)).scalar() # Total de assinaturas ativas total_assinaturas = db.query(func.count(AssinaturaAnual.id)).scalar() # Últimos militantes cadastrados (limit 5) - eager load emails militantes_query = db.query(Militante).options( joinedload(Militante.emails) ).order_by(Militante.id.desc()).limit(5).all() # Convert militantes to dictionaries to avoid lazy loading issues ultimos_militantes = [] for militante in militantes_query: militante_dict = { 'id': militante.id, 'nome': militante.nome, 'emails': [{'endereco_email': email.endereco_email} for email in militante.emails] } ultimos_militantes.append(militante_dict) # Últimos pagamentos (limit 5) - eager load militante pagamentos_query = db.query(Pagamento).options( joinedload(Pagamento.militante) ).order_by(Pagamento.data_pagamento.desc()).limit(5).all() # Convert pagamentos to dictionaries to avoid lazy loading issues ultimos_pagamentos = [] for pagamento in pagamentos_query: pagamento_dict = { 'id': pagamento.id, 'valor': pagamento.valor, 'data_pagamento': pagamento.data_pagamento, 'militante': { 'id': pagamento.militante.id, 'nome': pagamento.militante.nome } } ultimos_pagamentos.append(pagamento_dict) # Estatísticas por período hoje = datetime.now().date() inicio_mes = hoje.replace(day=1) # Militantes cadastrados este mês militantes_mes = db.query(func.count(Militante.id)).filter( Militante.id >= 1 # Assuming ID is auto-increment ).scalar() # Pagamentos este mês pagamentos_mes = db.query(func.sum(Pagamento.valor)).filter( Pagamento.data_pagamento >= inicio_mes ).scalar() total_pagamentos_mes = f"{pagamentos_mes:.2f}" if pagamentos_mes else "0.00" return { 'total_militantes': total_militantes, 'total_cotas': total_cotas, 'total_materiais': total_materiais, 'total_assinaturas': total_assinaturas, 'ultimos_militantes': ultimos_militantes, 'ultimos_pagamentos': ultimos_pagamentos, 'militantes_mes': militantes_mes, 'pagamentos_mes': total_pagamentos_mes, 'cache_timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"Error calculating dashboard stats: {e}") return DashboardService._get_default_stats() @staticmethod def _get_default_stats() -> Dict[str, Any]: """Get default statistics when calculation fails""" return { 'total_militantes': 0, 'total_cotas': "0.00", 'total_materiais': 0, 'total_assinaturas': 0, 'ultimos_militantes': [], 'ultimos_pagamentos': [], 'militantes_mes': 0, 'pagamentos_mes': "0.00", 'cache_timestamp': datetime.now().isoformat() } @staticmethod @invalidate_cache_pattern("dashboard:*") def invalidate_dashboard_cache(): """Invalidate dashboard cache when data changes""" logger.info("Dashboard cache invalidated") @staticmethod @cached(expire=600, key_prefix="dashboard") # Cache for 10 minutes def get_militante_stats() -> Dict[str, Any]: """Get militante-specific statistics""" db = get_db_connection() try: # Militantes por estado estados = db.query(Militante.estado, func.count(Militante.id)).group_by(Militante.estado).all() # Militantes por responsabilidade responsabilidades = {} militantes = db.query(Militante).all() for militante in militantes: for resp in militante.get_responsabilidades(): responsabilidades[resp] = responsabilidades.get(resp, 0) + 1 return { 'estados': dict(estados), 'responsabilidades': responsabilidades, 'cache_timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"Error getting militante stats: {e}") return {'estados': {}, 'responsabilidades': {}, 'cache_timestamp': datetime.now().isoformat()} finally: db.close() @staticmethod @cached(expire=300, key_prefix="dashboard") def get_financial_stats() -> Dict[str, Any]: """Get financial statistics""" db = get_db_connection() try: # Total de pagamentos total_pagamentos = db.query(func.sum(Pagamento.valor)).scalar() # Pagamentos por mês (últimos 6 meses) hoje = datetime.now().date() stats_mensais = [] for i in range(6): inicio_mes = hoje.replace(day=1) - timedelta(days=30*i) fim_mes = inicio_mes.replace(day=28) + timedelta(days=4) fim_mes = fim_mes.replace(day=1) - timedelta(days=1) valor_mes = db.query(func.sum(Pagamento.valor)).filter( Pagamento.data_pagamento >= inicio_mes, Pagamento.data_pagamento <= fim_mes ).scalar() stats_mensais.append({ 'mes': inicio_mes.strftime('%Y-%m'), 'valor': float(valor_mes) if valor_mes else 0.0 }) return { 'total_pagamentos': float(total_pagamentos) if total_pagamentos else 0.0, 'stats_mensais': stats_mensais, 'cache_timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f"Error getting financial stats: {e}") return { 'total_pagamentos': 0.0, 'stats_mensais': [], 'cache_timestamp': datetime.now().isoformat() } finally: db.close() @staticmethod def obter_ultimos_militantes(limite: int = 5) -> List[Militante]: """Obtém os últimos militantes cadastrados""" db = get_db_connection() try: return db.query(Militante).order_by(Militante.id.desc()).limit(limite).all() finally: db.close() @staticmethod def obter_ultimos_pagamentos(limite: int = 5) -> List[Pagamento]: """Obtém os últimos pagamentos realizados""" db = get_db_connection() try: return db.query(Pagamento).join(Militante).order_by(Pagamento.data_pagamento.desc()).limit(limite).all() finally: db.close() @staticmethod def obter_tipos_pagamento() -> List[TipoPagamento]: """Obtém todos os tipos de pagamento""" db = get_db_connection() try: return db.query(TipoPagamento).all() finally: db.close() @staticmethod def obter_dados_dashboard() -> Dict: """Obtém todos os dados necessários para o dashboard""" return { 'estatisticas': DashboardService.get_dashboard_stats(), 'ultimos_militantes': DashboardService.obter_ultimos_militantes(), 'ultimos_pagamentos': DashboardService.obter_ultimos_pagamentos(), 'tipos_pagamento': DashboardService.obter_tipos_pagamento(), 'data_atual': datetime.now().strftime("%d/%m/%Y") }