diff --git a/Dockerfile b/Dockerfile index 3460a0f..f15596a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,48 +1,48 @@ FROM alpine:latest -# Instalar dependências do sistema -RUN apk update && \ - apk add --no-cache \ - python3 \ - py3-pip \ - make \ - git \ - gcc \ - python3-dev \ - musl-dev \ - linux-headers - -# Criar link simbólico para python3 -RUN ln -sf python3 /usr/bin/python - -# Definir diretório de trabalho +# Diretório de trabalho WORKDIR /app -# Copiar arquivos do projeto +# UID/GID configuráveis para compatibilizar permissões em bind mounts Linux +ARG APP_UID=1000 +ARG APP_GID=1000 + +# Instalar Python no Alpine e criar alias `python` +RUN apk add --no-cache python3 py3-pip \ + && ln -sf python3 /usr/bin/python + +# Instalar dependências Python em venv usando build deps temporários +COPY requirements.txt . +RUN apk add --no-cache --virtual .build-deps \ + gcc \ + musl-dev \ + linux-headers \ + && python -m venv /venv \ + && /venv/bin/pip install --upgrade pip \ + && /venv/bin/pip install --no-cache-dir -r requirements.txt \ + && apk del .build-deps + +# Copiar código da aplicação COPY . . -# Criar e ativar ambiente virtual -RUN python -m venv /venv && \ - . /venv/bin/activate && \ - pip install --upgrade pip && \ - pip install -r requirements.txt +# Criar usuário sem privilégios e diretórios de escrita necessários +RUN addgroup -S -g "${APP_GID}" appgroup \ + && adduser -S -D -H -u "${APP_UID}" -G appgroup appuser \ + && mkdir -p /data /app/logs \ + && chown -R appuser:appgroup /app /data /venv + +# Ambiente padrão +ENV PATH="/venv/bin:$PATH" \ + FLASK_APP=app.py \ + FLASK_ENV=production \ + PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 + +# Rodar aplicação como usuário não-root +USER appuser # Expor a porta que o Flask usa EXPOSE 5000 -# Definir o ambiente virtual como padrão -ENV PATH="/venv/bin:$PATH" -ENV FLASK_APP=app.py -ENV FLASK_ENV=production - -# Criar script de inicialização -RUN echo '#!/bin/sh' > /app/start.sh && \ - echo 'echo "Inicializando banco de dados..."' >> /app/start.sh && \ - echo 'python init_db.py' >> /app/start.sh && \ - echo 'echo "Banco de dados inicializado!"' >> /app/start.sh && \ - echo 'echo "Iniciando aplicação..."' >> /app/start.sh && \ - echo 'exec gunicorn --bind 0.0.0.0:5000 app:app' >> /app/start.sh && \ - chmod +x /app/start.sh - # Comando para rodar a aplicação -CMD ["/app/start.sh"] +CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"] diff --git a/Makefile b/Makefile index a5a847b..fa34057 100644 --- a/Makefile +++ b/Makefile @@ -1,58 +1,126 @@ +.PHONY: install clean db-reset db-seed-fake db-seed-test-users admin-reset admin-rotate-otp \ + run run-gunicorn docker-db-reset docker-db-seed-fake docker-db-seed-test-users \ + docker-admin-reset docker-admin-rotate-otp docker-build docker-up docker-down docker-logs \ + docker-restart docker-db-reset-xplat docker-db-seed-fake-xplat docker-db-seed-test-users-xplat \ + docker-admin-reset-xplat docker-admin-rotate-otp-xplat docker-build-xplat docker-up-xplat \ + docker-down-xplat docker-logs-xplat cache-clear cache-status cache-keys dev-up dev-down \ + prod-build prod-up prod-logs cache-warmup cache-monitor + install: pip install -r requirements.txt clean: - rm -rf ~/.local/share/controles/database.db* + rm -f ~/.local/share/controles/database.db* + rm -f database.db* + rm -f data/database.db* rm -f admin_qr.png + rm -f data/admin_qr.png + rm -f /tmp/admin_qr.png -init-db: clean - python init_db.py +db-reset: clean + PYTHONUNBUFFERED=1 python -B scripts/manage.py db_reset -seed: init-db - python seed.py +# Apenas seed (seed_database.py) +db-seed-fake: + PYTHONUNBUFFERED=1 python -B scripts/manage.py db_seed_fake -init: - python app.py --init +# Apenas seed (create_test_users.py) +db-seed-test-users: + PYTHONUNBUFFERED=1 python -B scripts/manage.py db_seed_test_users +# Busca o OTP padrão +admin-reset: + PYTHONUNBUFFERED=1 python -B scripts/manage.py admin_reset + +# Novo OTP +admin-rotate-otp: + PYTHONUNBUFFERED=1 python -B scripts/manage.py admin_rotate_otp + +# Server padrão do python run: - python app.py + PYTHONUNBUFFERED=1 python -B app.py -run-with-seed: seed init run - -reset-admin: clean - python create_admin.py +# server padrão de produção (recomendado) +run-gunicorn: + PYTHONUNBUFFERED=1 python -B -m gunicorn --bind 0.0.0.0:5000 app:app # Docker commands +docker-db-reset: + mkdir -p data logs + docker-compose -f docker-compose.yml exec app python -B scripts/manage.py db_reset + +docker-db-seed-fake: + docker-compose -f docker-compose.yml exec app python -B scripts/manage.py db_seed_fake + +docker-db-seed-test-users: + docker-compose -f docker-compose.yml exec app python -B scripts/manage.py db_seed_test_users + +docker-admin-reset: + docker-compose -f docker-compose.yml exec app python -B scripts/manage.py admin_reset + +docker-admin-rotate-otp: + docker-compose -f docker-compose.yml exec app python -B scripts/manage.py admin_rotate_otp + docker-build: - docker-compose build + mkdir -p data logs + docker-compose -f docker-compose.yml build docker-up: - docker-compose up -d + mkdir -p data logs + docker-compose -f docker-compose.yml up -d docker-down: - docker-compose down + docker-compose -f docker-compose.yml down docker-logs: - docker-compose logs -f + docker-compose -f docker-compose.yml logs -f docker-restart: - docker-compose restart + docker-compose -f docker-compose.yml restart + +# Docker commands (fallback cross-platform) +docker-db-reset-xplat: + docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py db_reset + +docker-db-seed-fake-xplat: + docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py db_seed_fake + +docker-db-seed-test-users-xplat: + docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py db_seed_test_users + +docker-admin-reset-xplat: + docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py admin_reset + +docker-admin-rotate-otp-xplat: + docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py admin_rotate_otp + +docker-build-xplat: + mkdir -p data logs + docker-compose -f docker-compose.crossplatform.yml build + +docker-up-xplat: + docker-compose -f docker-compose.crossplatform.yml up -d + +docker-down-xplat: + docker-compose -f docker-compose.crossplatform.yml down + +docker-logs-xplat: + docker-compose -f docker-compose.crossplatform.yml logs -f # Redis cache commands cache-clear: - docker-compose exec redis redis-cli FLUSHDB + docker-compose -f docker-compose.yml exec redis redis-cli FLUSHDB cache-status: - docker-compose exec redis redis-cli INFO + docker-compose -f docker-compose.yml exec redis redis-cli INFO cache-keys: - docker-compose exec redis redis-cli KEYS "*" + docker-compose -f docker-compose.yml exec redis redis-cli KEYS "*" # Development with Docker dev-up: docker-build docker-up @echo "Development environment started with Redis cache" @echo "Application: http://localhost:5000" - @echo "Redis: localhost:6379" dev-down: docker-down @echo "Development environment stopped" @@ -77,4 +145,4 @@ cache-warmup: cache-monitor: @echo "Monitoring Redis cache..." - watch -n 5 'docker-compose exec redis redis-cli INFO memory' + watch -n 5 'docker-compose -f docker-compose.yml exec redis redis-cli INFO memory' diff --git a/README.md b/README.md index 11713c4..3259ed7 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,258 @@ -# Sistema de Controle de Militantes +# Sistema de Controles OCI -Sistema para gerenciamento de militantes, células, setores e comitês regionais. +Sistema web para gestão organizacional (militantes, estrutura hierárquica, cotas, pagamentos e materiais), com autenticação por senha + OTP, permissões RBAC e cache Redis. + +## 🔧 Tecnologias + +- **Backend**: Flask 2.3.3 +- **Frontend**: Bootstrap 5, HTML5, CSS3, JavaScript +- **Database**: SQLite + SQLAlchemy 2.0.21 +- **Cache**: Redis 7.4.4 (opcional fora do Docker) +- **Authentication**: Flask-Login + OTP (pyotp) +- **Container**: Docker + Docker Compose +- **Server**: Gunicorn + +## 🚀 Status Atual + +- Sistema com Arquitetura de Permissões (RBAC) +- Sistema de permissões implementado no nível de dados +- Estrutura organizacional completa +- Aplicação Flask rodando com Docker +- Redis cache integrado e funcionando +- Banco de dados SQLite inicializado +- Usuário admin configurado com OTP +- 30 militantes de teste criados +- Menus sempre visíveis, controle transparente + +## 🏗️ Arquitetura de Permissões + +O sistema implementa uma estratégia de controle de permissões no **nível de dados**, garantindo que: + +- **Menus permanecem sempre visíveis** - Não há restrições na interface +- **Dados são filtrados por hierarquia** - Admin → CC → CR → Setor → Célula +- **Templates nunca quebram** - Sempre renderizam, mesmo com dados vazios + +## ⚙️ Instalação - Pré-requisitos + +- Docker + Docker Compose (para fluxo com containers) +- Porta 5000 disponível para a aplicação +- Porta 6379 disponível para Redis +- Python 3.10+ (recomendado) +- `pip` +- `make` + +## 🐳 Primeiro Inicio com Docker (recomendado) + +### 0. Clone o repositorio + +```bash +git clone git@gitea.comunatec.org:comunatec/controles.git +cd controles +``` + +### 1. Resete o banco + +```bash +make docker-db-reset +``` + +### 2. Adicione dados fakes para testes (opcional) + +```bash +make docker-db-seed-fake +``` + +### 3. Subir aplicação + +```bash +make dev-up +``` + +### 4. Acompanhar logs + +- Aplicação: `logs/controles.log` +- Cache: `logs/cache.log` +- Docker: `docker-compose logs` + +```bash +make docker-logs # Toda a aplicação +docker-compose logs redis # Somente o redis +make cache-status # INFO do redis +``` + +### 5. Descer aplicação + +```bash +make dev-down +``` + +## 🐍 Primeiro Inicio - Execução Local (Sem Docker) + +### 0. Clone o repositorio + +```bash +git clone git@gitea.comunatec.org:comunatec/controles.git +cd controles +``` + +### 1. Ambiente Python + +```bash +python -m venv .venv +source .venv/bin/activate # Linux/Mac +# ou +venv\Scripts\activate # Windows +pip install -r requirements.txt +``` + +### 2. Crie o `.env` na raiz + +Exemplo: + +```env +# Usando OTP padrão para não trocar toda hora no desenvolvimento +ADMIN_OTP_SECRET=JBSWY3DPEHPK3PXP + +# Para usar o mesmo banco que o Docker (Linux/WSL permite bind-mount) +DATABASE_URL=sqlite:///data/database.db +REDIS_URL=redis://redis:6379/0 + +FLASK_APP=app.py +FLASK_ENV=development +SECRET_KEY=troque_esta_chave + +APP_UID=1000 +APP_GID=1000 + +MAIL_SERVER=seu_servidor_smtp +MAIL_PORT=587 +MAIL_USE_TLS=True +MAIL_USERNAME=seu_email +MAIL_PASSWORD=sua_senha +``` + +Se Redis não estiver disponível localmente, a aplicação continua rodando sem cache. + +### 3. Inicialize banco e rode + +```bash +make db-reset +make db-seed-fake # opcional +make run +# ou +make run-gunicorn # server de produção +``` + +## 🔐 Acesso ao Sistema + +### Credenciais do Admin +- **URL**: http://localhost:5000 +- **Usuário**: admin +- **Senha**: admin123 +- **OTP Secret**: JBSWY3DPEHPK3PXP + +### Configuração OTP + +1. Instale um aplicativo autenticador (Google Authenticator, Microsoft Authenticator) + +2. Configure manualmente: + - Descrição da chave (Codinome): Controles-OCI-admin + - Segredo OTP (Sua Chave): JBSWY3DPEHPK3PXP + - Tipo: TOTP + - Algoritmo: SHA1 + - Dígitos: 6 + - Intervalo: 30 segundos + +**OU** use o QR Code gerado em `/tmp/admin_qr.png` ou `/data/admin_qr.png` ou `admin_qr.png`. + +PS: Google Authenticator só tem "Codinome" e "Sua Chave" de config, e tá tudo bem. + +## Testes Automatizados + +```bash +# ambiente já configurado +pip install -r tests/requirements-test.txt +pytest +``` + +Também existe `run_tests.sh`, que monta um venv e executa a suíte automaticamente. + +- TODO: Talvez trocar o nome para venv_test + +## 📁 Estrutura de arquivos + +O sistema busca seguir padrão MVC (Model-View-Controller), atualmente está: + +``` +controles/ +├── controllers/ # Controladores (lógica de rotas) +├── data/ # Banco de dados (e talvez qr_code admin) +├── docs/ # Documentações da arquitetura +├── functions/ # Funções utilitárias +├── logs/ # Logs d aplicação, redis... +├── migrations/ # Alterações de banco para não perder dados (produção) +├── models/ # Modelos (operações de banco) +├── routes/ # Rotas de aplicação +├── scripts/ # Scripts de gerenciamento +├── services/ # Serviços (lógica de negócio) +├── sql/ # Migrate para o rbac +├── static/ # Arquivos estáticos (icon/css/js) +├── templates/ # Views (templates HTML) +├── tests/ # Testes automatizados +├── utils/ # Funções sem regra de negócio ou dependencia de domínio +├── app.py # Ponto de entrada da aplicação +├── docker-compose.yml # Configuração Docker +├── Dockerfile # Imagem Docker +└── requirements.txt # Dependências Python +``` + +- TODO: temos duas rotas (routes e controllers)? Unificar futuramente. +- TODO: sql/migrate_db parece utilizar outro banco. + +## 🤝 Contribuição + +1. Crie uma branch para sua feature +2. Commit suas mudanças +3. Push sua branch para o Gitea +4. Outro camarada verifica a branch +5. Abra um Pull Request para a branch solicitada + +## 📄 Licença + +Este projeto é privado para uso da OCI. + +## 🔍 Troubleshooting + +1. **Redis não conecta** + + ```bash + docker-compose logs redis + docker-compose restart redis + ``` + - Redis está indisponível localmente, mas app continua executando mesmo fora do Docker. + +2. **Cache não funciona** + + ```bash + make cache-status + make cache-clear + ``` + +3. **Aplicação não inicia** + + ```bash + docker-compose logs app + docker-compose down && docker-compose up -d + ``` + +4. **Modificações no banco local não alteram o banco no Docker** + + - Linux bind mount no grupo de usuario errado: ajuste `APP_UID`/`APP_GID` no `Dockerfile` para seu grupo de usuarios (padrão=1000). + - Docker com engine do Windows não consegue fazer bind mount, então alterações no banco local não refletem no banco do Docker, use as operações dentro do docker com make docker-* ou no windows instale o wsl2 e instale o Docker com apenas a engine "Docker no WSL". ## Estrutura de Permissões (RBAC) -O sistema utiliza um sistema de controle de acesso baseado em papéis (RBAC) com a seguinte hierarquia: +O sistema utiliza um sistema de controle de acesso baseado em papéis (RBAC), onde a verificação de ações são feitas com permissões (permission), e as permissões são pré-definidas com base em papeis (role). Que possuem a seguinte hierarquia: ### Níveis de Papéis @@ -47,41 +295,7 @@ O sistema utiliza um sistema de controle de acesso baseado em papéis (RBAC) com - Criar CRs - Configurar sistema -## Instalação - -1. Clone o repositório -2. Crie um ambiente virtual: - ```bash - python -m venv venv - source venv/bin/activate # Linux/Mac - # ou - venv\Scripts\activate # Windows - ``` -3. Instale as dependências: - ```bash - pip install -r requirements.txt - ``` -4. Execute as migrações do banco de dados: - ```bash - python sql/migrate_db.py - ``` -5. Configure as variáveis de ambiente no arquivo `.env`: - ``` - FLASK_APP=app.py - FLASK_ENV=development - SECRET_KEY=sua_chave_secreta - MAIL_SERVER=seu_servidor_smtp - MAIL_PORT=587 - MAIL_USE_TLS=True - MAIL_USERNAME=seu_email - MAIL_PASSWORD=sua_senha - ``` -6. Execute o aplicativo: - ```bash - flask run - ``` - -## Uso +## Uso do RBAC ### Decoradores de Permissão @@ -99,31 +313,32 @@ O sistema fornece três decoradores para controle de acesso: - Verifica se o usuário tem um papel com nível mínimo - Exemplo: `@require_minimum_role(Role.SECRETARIO_CR)` -### Verificando Permissões no Código +### Verificando Permissões e Papéis no Código ```python # Verificar se um usuário tem uma permissão -if user.has_permission('create_cell_member'): +if user.has_permission(Permission.CREATE_CELL_MEMBER): # Faça algo # Verificar se um usuário tem um papel -if user.has_role('Secretário de Célula'): +if user.has_role(Role.SECRETARIO_CELULA): # Faça algo # Obter o papel mais alto do usuário highest_role = user.get_highest_role() -if highest_role and highest_role.nivel >= Role.SECRETARIO_CR: + +# Verificar se o usuário tem nível secretário de célula ou superior +if user.has_minimum_role(Role.SECRETARIO_CELULA): # Faça algo ``` -## Estrutura do Banco de Dados +## Documentação Complementar -O sistema utiliza as seguintes tabelas para o RBAC: - -- `roles`: Armazena os papéis disponíveis -- `permissions`: Armazena as permissões disponíveis -- `role_permissions`: Mapeia papéis para permissões -- `user_roles`: Mapeia usuários para papéis +- Documentação complementar: `docs/README.md` +- RBAC: `docs/rbac.md` +- Estratégia de permissões: `docs/permission_strategy.md` +- Redis e cache: `docs/redis_cache_setup.md` +- Histórico de correções de permissões: `docs/permission_fixes_summary.md` ## Segurança @@ -131,4 +346,4 @@ O sistema utiliza as seguintes tabelas para o RBAC: - Sessões expiram após período de inatividade - Controle de acesso granular baseado em papéis - Proteção contra CSRF -- Validação de entrada de dados +- Validação de entrada de dados \ No newline at end of file diff --git a/app.py b/app.py index fd4baa5..90a318a 100644 --- a/app.py +++ b/app.py @@ -1,19 +1,24 @@ +import os +import secrets +import sys +import logging +import time +from pathlib import Path from flask import Flask from flask_bootstrap5 import Bootstrap from flask_login import LoginManager from flask_wtf.csrf import CSRFProtect from flask_mail import Mail -from functions.database import get_db_connection, Usuario -from functions.rbac import init_rbac -from functions.template_helpers import permission_context_processor, init_template_filters, safe_render_helper from sqlalchemy.orm import joinedload -import os -import secrets from dotenv import load_dotenv -import sys -import logging + +# Carregar .env antes de importar módulos +load_dotenv(Path(__file__).resolve().parent / ".env") + +from functions.database import get_db_session, Usuario +from functions.rbac import Role +from functions.template_helpers import permission_context_processor, init_template_filters, safe_render_helper from logging.handlers import RotatingFileHandler -import time # Importar blueprints from controllers.auth_controller import auth_bp @@ -28,8 +33,6 @@ from routes.admin import admin_bp # Import cache service from services.cache_service import cache_service -load_dotenv() - def setup_logging(app): """Configure logging for the application""" if not app.debug and not app.testing: @@ -99,12 +102,11 @@ def create_app(): @login_manager.user_loader def load_user(user_id): - """Carrega o usuário pelo ID""" - db = get_db_connection() + """Carrega o usuário pelo ID com roles e permissions (eager).""" + db = get_db_session() try: - # Carregar o usuário com suas roles user = db.query(Usuario).options( - joinedload(Usuario.roles) + joinedload(Usuario.roles).joinedload(Role.permissions) ).get(user_id) return user finally: @@ -162,21 +164,6 @@ def create_app(): return app -def init_system(): - """Inicializa o sistema""" - print("Inicializando sistema...") - - # Inicializar RBAC - print("Inicializando RBAC...") - init_rbac() - - # Criar usuário admin se não existir - from create_admin import create_admin_user - print("Criando usuário admin...") - create_admin_user() - - print("Sistema inicializado com sucesso!") - def main(): """Função principal""" # Criar a aplicação @@ -187,12 +174,13 @@ def main(): app = main() if __name__ == '__main__': - # Verificar se é para inicializar o sistema - if '--init' in sys.argv: - init_system() - else: - app.run( - host='0.0.0.0', - port=5000, - debug=os.getenv('FLASK_ENV') == 'development' - ) + if len(sys.argv) > 1: + print("app.py não aceita argumentos.") + print("Use 'python scripts/manage.py --help' para comandos administrativos.") + raise SystemExit(2) + + app.run( + host='0.0.0.0', + port=5000, + debug=os.getenv('FLASK_ENV') == 'development' + ) \ No newline at end of file diff --git a/controllers/auth_controller.py b/controllers/auth_controller.py index ff11173..6b920b1 100644 --- a/controllers/auth_controller.py +++ b/controllers/auth_controller.py @@ -1,13 +1,10 @@ from flask import Blueprint, request, render_template, redirect, url_for, flash, session, jsonify from flask_login import login_user, logout_user, current_user from datetime import datetime -from functions.database import get_db_connection, Usuario +from functions.database import Militante, get_db_session, Usuario from functions.decorators import require_login from werkzeug.security import generate_password_hash -import pyotp -import qrcode -import base64 -from io import BytesIO +from services.otp_service import generate_qr_code_base64 auth_bp = Blueprint('auth', __name__) @@ -30,7 +27,7 @@ def login(): flash("Email/usuário e senha são obrigatórios.", "danger") return redirect(url_for("auth.login")) - db = get_db_connection() + db = get_db_session() try: # Tenta encontrar o usuário por email ou username user = db.query(Usuario).filter( @@ -105,7 +102,7 @@ def api_login(): 'error': 'Email/username e senha são obrigatórios' }), 400 - db = get_db_connection() + db = get_db_session() try: # Buscar usuário user = db.query(Usuario).filter( @@ -182,7 +179,7 @@ def api_logout(): """Endpoint de logout API""" try: if current_user.is_authenticated: - db = get_db_connection() + db = get_db_session() try: user = current_user user.logout() @@ -226,7 +223,7 @@ def api_status(): @auth_bp.route("/logout") @require_login def logout(): - db = get_db_connection() + db = get_db_session() try: user = current_user if user: @@ -255,7 +252,7 @@ def alterar_senha(): flash("As senhas não coincidem.", "error") return redirect(url_for("auth.alterar_senha")) - db = get_db_connection() + db = get_db_session() try: user = db.query(Usuario).get(current_user.id) if not user.check_password(senha_atual): @@ -274,31 +271,14 @@ def alterar_senha(): @auth_bp.route("/qr/") def get_qr_code(token): """Gera QR code para configuração OTP""" - db = get_db_connection() + db = get_db_session() try: militante = db.query(Militante).filter_by(temp_token=token).first() if not militante or militante.temp_token_expiry < datetime.now(): flash('Token inválido ou expirado.', 'danger') return redirect(url_for('auth.login')) - qr_code = generate_qr_code(militante) + qr_code = generate_qr_code_base64(militante) return render_template('mostrar_qr_code.html', qr_code=qr_code) finally: db.close() - -def generate_qr_code(user): - """Gera um QR code para o usuário""" - if not user.otp_secret: - user.otp_secret = pyotp.random_base32() - - totp = pyotp.TOTP(user.otp_secret) - qr = qrcode.QRCode(version=1, box_size=10, border=5) - qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles")) - qr.make(fit=True) - - img = qr.make_image(fill_color="black", back_color="white") - buffer = BytesIO() - img.save(buffer, format="PNG") - qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8') - - return qr_code \ No newline at end of file diff --git a/controllers/cota_controller.py b/controllers/cota_controller.py index 193f435..7c01f9d 100644 --- a/controllers/cota_controller.py +++ b/controllers/cota_controller.py @@ -1,5 +1,5 @@ from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify -from functions.database import get_db_connection, CotaMensal, Militante +from functions.database import get_db_session, CotaMensal, Militante from functions.decorators import require_login from utils.date_utils import validar_data, converter_data from datetime import datetime @@ -12,6 +12,7 @@ cota_bp = Blueprint('cota', __name__) def novo(): """Cria uma nova cota mensal""" if request.method == "POST": + db = get_db_session() try: militante_id = request.form.get("militante_id") valor_antigo = float(request.form.get("valor_antigo")) @@ -23,7 +24,6 @@ def novo(): flash('Data inválida ou futura', 'danger') return redirect(url_for('cota.novo')) - db = get_db_connection() cota = CotaMensal( militante_id=militante_id, valor_antigo=valor_antigo, @@ -44,7 +44,7 @@ def novo(): db.close() # GET - Renderizar formulário - db = get_db_connection() + db = get_db_session() try: militantes = db.query(Militante).order_by(Militante.nome).all() return render_template("nova_cota.html", militantes=militantes) @@ -55,7 +55,7 @@ def novo(): @require_login def listar(): """Lista todas as cotas mensais com controle de permissões no nível de dados""" - db = get_db_connection() + db = get_db_session() try: # SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões cotas = [] @@ -82,7 +82,7 @@ def listar(): @require_login def editar(id): """Edita uma cota mensal""" - db = get_db_connection() + db = get_db_session() try: cota = db.query(CotaMensal).get(id) if not cota: @@ -114,7 +114,7 @@ def editar(id): @require_login def excluir(id): """Exclui uma cota mensal""" - db = get_db_connection() + db = get_db_session() try: cota = db.query(CotaMensal).get(id) if not cota: diff --git a/controllers/home_controller.py b/controllers/home_controller.py index c8c0458..6ada27e 100644 --- a/controllers/home_controller.py +++ b/controllers/home_controller.py @@ -1,5 +1,5 @@ from flask import Blueprint, render_template, flash, redirect, url_for, jsonify -from functions.database import get_db_connection, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento +from functions.database import get_db_session, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento from functions.decorators import require_login from datetime import datetime from sqlalchemy import func @@ -28,7 +28,7 @@ def dashboard(): stats = DashboardService.get_dashboard_stats() # Get tipos de pagamento for the modal - db = get_db_connection() + db = get_db_session() try: tipos_pagamento = db.query(TipoPagamento).all() finally: diff --git a/controllers/material_controller.py b/controllers/material_controller.py index f39b5f9..2fdd727 100644 --- a/controllers/material_controller.py +++ b/controllers/material_controller.py @@ -1,5 +1,5 @@ from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify -from functions.database import get_db_connection, MaterialVendido, Militante, TipoMaterial +from functions.database import get_db_session, MaterialVendido, Militante, TipoMaterial from functions.decorators import require_login from utils.date_utils import validar_data, converter_data from datetime import datetime @@ -11,7 +11,7 @@ material_bp = Blueprint('material', __name__) @require_login def listar(): """Lista todos os materiais com controle de permissões no nível de dados""" - db = get_db_connection() + db = get_db_session() try: # SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões materiais = [] @@ -46,6 +46,7 @@ def listar(): def novo(): """Cria um novo material vendido""" if request.method == "POST": + db = get_db_session() try: militante_id = request.form.get("militante_id") tipo_material_id = request.form.get("tipo_material_id") @@ -57,7 +58,6 @@ def novo(): flash('Data de venda inválida ou futura', 'danger') return redirect(url_for('material.novo')) - db = get_db_connection() material = MaterialVendido( militante_id=militante_id, tipo_material_id=tipo_material_id, @@ -77,7 +77,7 @@ def novo(): db.close() # GET - Renderizar formulário - db = get_db_connection() + db = get_db_session() try: militantes = db.query(Militante).order_by(Militante.nome).all() tipos_material = db.query(TipoMaterial).order_by(TipoMaterial.descricao).all() @@ -89,7 +89,7 @@ def novo(): @require_login def editar(id): """Edita um material vendido""" - db = get_db_connection() + db = get_db_session() try: material = db.query(MaterialVendido).get(id) if not material: @@ -122,7 +122,7 @@ def editar(id): @require_login def excluir(id): """Exclui um material vendido""" - db = get_db_connection() + db = get_db_session() try: material = db.query(MaterialVendido).get(id) if not material: @@ -146,7 +146,7 @@ def excluir(id): @require_login def listar_tipos(): """Lista todos os tipos de materiais com controle de permissões no nível de dados""" - db = get_db_connection() + db = get_db_session() try: # SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões tipos_materiais = [] @@ -174,10 +174,10 @@ def listar_tipos(): def novo_tipo(): """Cria um novo tipo de material""" if request.method == "POST": + db = get_db_session() try: descricao = request.form.get("descricao") - db = get_db_connection() tipo = TipoMaterial(descricao=descricao) db.add(tipo) db.commit() @@ -196,7 +196,7 @@ def novo_tipo(): @require_login def editar_tipo(id): """Edita um tipo de material""" - db = get_db_connection() + db = get_db_session() try: tipo = db.query(TipoMaterial).get(id) if not tipo: @@ -222,7 +222,7 @@ def editar_tipo(id): @require_login def excluir_tipo(id): """Exclui um tipo de material""" - db = get_db_connection() + db = get_db_session() try: tipo = db.query(TipoMaterial).get(id) if not tipo: diff --git a/controllers/militante_controller.py b/controllers/militante_controller.py index 8383494..53fdacd 100644 --- a/controllers/militante_controller.py +++ b/controllers/militante_controller.py @@ -1,5 +1,5 @@ from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify -from functions.database import get_db_connection, Militante, EmailMilitante, Endereco, Celula, Setor, ComiteRegional +from functions.database import get_db_session, safe_rollback, Militante, EmailMilitante, Endereco, Celula, Setor, ComiteRegional from functions.decorators import require_login from functions.validations import validar_cpf from functions.rbac import Permission @@ -14,6 +14,7 @@ militante_bp = Blueprint('militante', __name__) @require_login def criar(): """Cria um novo militante""" + db = get_db_session() try: data = request.get_json() @@ -29,9 +30,7 @@ def criar(): 'status': 'error', 'message': 'CPF inválido' }), 400 - - db = get_db_connection() - + # Verificar se CPF já existe if db.query(Militante).filter_by(cpf=data['cpf']).first(): return jsonify({ @@ -104,7 +103,7 @@ def criar(): @require_login def listar(): """Lista todos os militantes com controle de permissões no nível de dados""" - db = get_db_connection() + db = get_db_session() try: # SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões militantes = [] @@ -182,7 +181,7 @@ def listar(): @require_login def excluir(id): """Exclui um militante""" - db = get_db_connection() + db = get_db_session() try: militante = db.query(Militante).get(id) if not militante: @@ -211,10 +210,9 @@ def excluir(id): @require_login def editar(militante_id): """Edita um militante existente""" + db = get_db_session() try: data = request.get_json() - - db = get_db_connection() militante = db.query(Militante).get(militante_id) if not militante: @@ -283,7 +281,7 @@ def editar(militante_id): @require_login def buscar_dados(militante_id): """Busca os dados de um militante específico""" - db = get_db_connection() + db = get_db_session() try: militante = db.query(Militante).options( joinedload(Militante.emails), @@ -359,7 +357,7 @@ def buscar_dados(militante_id): @require_login def get_setores(cr_id): """Retorna setores de um CR específico""" - db = get_db_connection() + db = get_db_session() try: setores = db.query(Setor).filter_by(cr_id=cr_id).all() return jsonify([{'id': s.id, 'nome': s.nome} for s in setores]) diff --git a/controllers/pagamento_controller.py b/controllers/pagamento_controller.py index e890934..e8eed39 100644 --- a/controllers/pagamento_controller.py +++ b/controllers/pagamento_controller.py @@ -1,5 +1,5 @@ from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify -from functions.database import get_db_connection, Pagamento, Militante, TipoPagamento +from functions.database import get_db_session, Pagamento, Militante, TipoPagamento from functions.decorators import require_login from utils.date_utils import validar_data, converter_data from datetime import datetime @@ -12,6 +12,7 @@ pagamento_bp = Blueprint('pagamento', __name__) def novo(): """Cria um novo pagamento""" if request.method == "POST": + db = get_db_session() try: militante_id = request.form.get("militante_id") tipo_pagamento_id = request.form.get("tipo_pagamento_id") @@ -22,7 +23,6 @@ def novo(): flash('Data de pagamento inválida ou futura', 'danger') return redirect(url_for('pagamento.novo')) - db = get_db_connection() pagamento = Pagamento( militante_id=militante_id, tipo_pagamento_id=tipo_pagamento_id, @@ -41,7 +41,7 @@ def novo(): db.close() # GET - Renderizar formulário - db = get_db_connection() + db = get_db_session() try: militantes = db.query(Militante).order_by(Militante.nome).all() tipos_pagamento = db.query(TipoPagamento).order_by(TipoPagamento.descricao).all() @@ -53,7 +53,7 @@ def novo(): @require_login def listar(): """Lista todos os pagamentos com controle de permissões no nível de dados""" - db = get_db_connection() + db = get_db_session() try: # SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões pagamentos = [] @@ -88,13 +88,13 @@ def listar(): def adicionar(): """Adiciona um novo pagamento""" if request.method == "POST": + db = get_db_session() try: militante_id = request.form.get("militante_id") tipo_pagamento = request.form.get("tipo_pagamento") valor = float(request.form.get("valor")) data_pagamento = converter_data(request.form.get("data_pagamento")) - db = get_db_connection() pagamento = Pagamento( militante_id=militante_id, tipo_pagamento=tipo_pagamento, @@ -116,7 +116,7 @@ def adicionar(): @require_login def list_pagamentos_celula(celula_id): """Lista pagamentos de uma célula específica""" - db = get_db_connection() + db = get_db_session() try: pagamentos = db.query(Pagamento).filter_by(celula_id=celula_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) @@ -127,7 +127,7 @@ def list_pagamentos_celula(celula_id): @require_login def list_pagamentos_setor(setor_id): """Lista pagamentos de um setor específico""" - db = get_db_connection() + db = get_db_session() try: pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) @@ -138,7 +138,7 @@ def list_pagamentos_setor(setor_id): @require_login def list_pagamentos_cr(cr_id): """Lista pagamentos de um CR específico""" - db = get_db_connection() + db = get_db_session() try: pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all() return render_template('pagamentos/list.html', pagamentos=pagamentos) @@ -150,7 +150,7 @@ def list_pagamentos_cr(cr_id): def novo_pagamento_celula(celula_id): """Cria novo pagamento para uma célula""" if request.method == 'POST': - db = get_db_connection() + db = get_db_session() try: pagamento = Pagamento( valor=request.form['valor'], @@ -171,7 +171,7 @@ def novo_pagamento_celula(celula_id): def novo_pagamento_setor(setor_id): """Cria novo pagamento para um setor""" if request.method == 'POST': - db = get_db_connection() + db = get_db_session() try: pagamento = Pagamento( valor=request.form['valor'], @@ -192,7 +192,7 @@ def novo_pagamento_setor(setor_id): def novo_pagamento_cr(cr_id): """Cria novo pagamento para um CR""" if request.method == 'POST': - db = get_db_connection() + db = get_db_session() try: pagamento = Pagamento( valor=request.form['valor'], diff --git a/controllers/usuario_controller.py b/controllers/usuario_controller.py index 07df661..4c3e821 100644 --- a/controllers/usuario_controller.py +++ b/controllers/usuario_controller.py @@ -1,5 +1,5 @@ from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify -from functions.database import get_db_connection, Usuario, Role, Setor +from functions.database import get_db_session, Usuario, Role, Setor from functions.decorators import require_login from flask_login import current_user import pyotp @@ -18,7 +18,7 @@ def novo(): setor_id = request.form.get("setor_id") # Verificar se usuário já existe - db = get_db_connection() + db = get_db_session() try: if db.query(Usuario).filter_by(username=username).first(): flash('Nome de usuário já existe.', 'danger') @@ -45,7 +45,7 @@ def novo(): finally: db.close() - db = get_db_connection() + db = get_db_session() try: roles = db.query(Role).order_by(Role.nome).all() setores = db.query(Setor).order_by(Setor.nome).all() @@ -63,7 +63,7 @@ def toggle_status(user_id): 'error': 'Você não tem permissão para alterar o status de usuários.' }), 403 - db = get_db_connection() + db = get_db_session() try: usuario = db.query(Usuario).get(user_id) if not usuario: @@ -105,7 +105,7 @@ def alterar_nivel(user_id): 'error': 'Novo nível não especificado.' }), 400 - db = get_db_connection() + db = get_db_session() try: usuario = db.query(Usuario).get(user_id) if not usuario: @@ -150,7 +150,7 @@ def toggle_quadro_orientador(user_id): 'error': 'Você não tem permissão para alterar responsabilidades de usuários.' }), 403 - db = get_db_connection() + db = get_db_session() try: usuario = db.query(Usuario).get(user_id) if not usuario: diff --git a/create_admin.py b/create_admin.py deleted file mode 100644 index 5e54eca..0000000 --- a/create_admin.py +++ /dev/null @@ -1,171 +0,0 @@ -from functions.database import init_database, Usuario, Role, get_db_connection -import qrcode -import os -from pathlib import Path -import pyotp - -def generate_qr_code(user): - """ - Gera o QR code para um usuário específico - - Args: - user: Instância do modelo Usuario - - Returns: - tuple: (caminho do arquivo, URI do OTP) - """ - # Tentar diferentes caminhos para salvar o QR code - qr_paths = [ - Path('/tmp/admin_qr.png'), # Diretório temporário do sistema - Path('admin_qr.png'), # Diretório atual - Path('/app/admin_qr.png') # Diretório da aplicação - ] - - # Gerar e salvar QR Code - qr = qrcode.QRCode(version=1, box_size=10, border=5) - - # Gerar URI do OTP - totp = pyotp.TOTP(user.otp_secret) - otp_uri = totp.provisioning_uri( - name=user.username, - issuer_name="Sistema de Controles" - ) - - qr.add_data(otp_uri) - qr.make(fit=True) - img = qr.make_image(fill_color="black", back_color="white") - - # Tentar salvar em diferentes locais - qr_saved = False - saved_path = None - - for qr_path in qr_paths: - try: - # Tentar salvar o arquivo - img.save(str(qr_path)) - print(f"QR Code salvo em: {qr_path}") - qr_saved = True - saved_path = qr_path - break - except Exception as e: - print(f"Não foi possível salvar o QR code em {qr_path}: {e}") - continue - - if not qr_saved: - print("AVISO: Não foi possível salvar o QR code em nenhum local") - print("O QR code pode ser gerado manualmente usando o URI OTP") - saved_path = None - - return saved_path, otp_uri - -def create_admin_user(): - """Cria ou atualiza o usuário admin""" - try: - # Inicializar banco de dados - init_database() - - # Criar sessão - db = get_db_connection() - - try: - # Verificar se já existe um usuário admin - admin = db.query(Usuario).filter_by(username="admin").first() - - if admin: - print("\n=== Usuário Admin Encontrado ===") - if not admin.otp_secret: - print("Gerando novo segredo OTP...") - admin.generate_otp_secret() - db.commit() - else: - print("\n=== Criando Novo Usuário Admin ===") - # Criar novo usuário admin - admin = Usuario( - username="admin", - email="admin@example.com", - is_admin=True - ) - admin.set_password("admin123") - admin.generate_otp_secret() - - # Buscar ou criar role de admin - admin_role = db.query(Role).filter_by(nome="admin").first() - if not admin_role: - admin_role = Role(nome="admin", nivel=0) # Nível 0 é o mais alto - db.add(admin_role) - - # Adicionar role ao usuário - admin.roles.append(admin_role) - - # Adicionar e fazer commit - db.add(admin) - db.commit() - - # Gerar QR code - qr_path, otp_uri = generate_qr_code(admin) - - if qr_path: - print("\n=== QR Code Gerado ===") - print(f"QR Code salvo em: {qr_path}") - print(f"URI do OTP: {otp_uri}") - else: - print("\n=== QR Code Não Pode Ser Salvo ===") - print("Use o URI OTP para configuração manual:") - print(f"URI do OTP: {otp_uri}") - - # Mostrar informações - print("\n=== Informações do Admin ===") - print(f"Username: {admin.username}") - print(f"Email: {admin.email}") - print(f"Senha: admin123") - print(f"Segredo OTP: {admin.otp_secret}") - - # Gerar código atual para verificação - totp = pyotp.TOTP(admin.otp_secret) - current_code = totp.now() - print("\n=== Verificação do OTP ===") - print(f"Código OTP atual: {current_code}") - print(f"Verificação do código: {totp.verify(current_code)}") - - print("\n=== Instruções para Configuração ===") - print("1. Instale um aplicativo autenticador no seu celular") - print(" (Google Authenticator, Microsoft Authenticator, etc)") - print("2. Abra o aplicativo") - print("3. Selecione a opção para adicionar uma nova conta") - if qr_path: - print("4. Escaneie o QR Code salvo em:", qr_path) - print("\nOU configure manualmente:") - print(f"- Nome da conta: {admin.username}") - print(f"- Segredo: {admin.otp_secret}") - print("- Tipo: Baseado em tempo (TOTP)") - print("- Algoritmo: SHA1") - print("- Dígitos: 6") - print("- Intervalo: 30 segundos") - - # Verificação final - print("\n=== Teste de Verificação ===") - test_code = totp.now() - print(f"Código de teste: {test_code}") - is_valid = admin.verify_otp(test_code) - print(f"Verificação do código: {'Sucesso' if is_valid else 'Falha'}") - - if not is_valid: - print("\nALERTA: Verificação do OTP falhou!") - print("Por favor, verifique se o segredo OTP está correto.") - - # Fazer commit final para garantir que tudo foi salvo - db.commit() - - except Exception as e: - db.rollback() - raise e - finally: - db.close() - - except Exception as e: - print(f"\nErro durante a execução: {e}") - import traceback - traceback.print_exc() - -if __name__ == "__main__": - create_admin_user() diff --git a/docker-compose.crossplatform.yml b/docker-compose.crossplatform.yml new file mode 100644 index 0000000..3e86872 --- /dev/null +++ b/docker-compose.crossplatform.yml @@ -0,0 +1,62 @@ +services: + # Redis Cache Service + redis: + image: redis:7-alpine + container_name: controles_redis + volumes: + - redis_data:/data + command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru + restart: unless-stopped + healthcheck: + test: [ "CMD", "redis-cli", "ping" ] + interval: 30s + timeout: 10s + retries: 3 + networks: + - controles_network + + # Flask Application + app: + build: . + container_name: controles_app + ports: + - "5000:5000" + environment: + - FLASK_APP=app.py + - FLASK_ENV=production + - REDIS_URL=redis://redis:6379/0 + - DATABASE_URL=sqlite:////data/database.db + + # DEV apenas para facilitar testes, não deve ser usado em produção + - ADMIN_OTP_SECRET=JBSWY3DPEHPK3PXP + + # Produção definir em .env + #- ADMIN_OTP_SECRET=${ADMIN_OTP_SECRET} + volumes: + - app_data:/data + - app_logs:/app/logs + read_only: true + tmpfs: + - /tmp + security_opt: + - no-new-privileges:true + cap_drop: + - ALL + depends_on: + redis: + condition: service_healthy + restart: unless-stopped + networks: + - controles_network + +volumes: + redis_data: + driver: local + app_data: + driver: local + app_logs: + driver: local + +networks: + controles_network: + driver: bridge diff --git a/docker-compose.yml b/docker-compose.yml index 4238e4b..598efbe 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,18 +1,14 @@ -version: '3.8' - services: # Redis Cache Service redis: image: redis:7-alpine container_name: controles_redis - ports: - - "6379:6379" volumes: - redis_data:/data command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru restart: unless-stopped healthcheck: - test: ["CMD", "redis-cli", "ping"] + test: [ "CMD", "redis-cli", "ping" ] interval: 30s timeout: 10s retries: 3 @@ -21,7 +17,11 @@ services: # Flask Application app: - build: . + build: + context: . + args: + APP_UID: ${APP_UID:-1000} + APP_GID: ${APP_GID:-1000} container_name: controles_app ports: - "5000:5000" @@ -29,11 +29,23 @@ services: - FLASK_APP=app.py - FLASK_ENV=production - REDIS_URL=redis://redis:6379/0 - - DATABASE_URL=sqlite:///app/database.db + - DATABASE_URL=sqlite:////data/database.db + + # DEV apenas para facilitar testes, não deve ser usado em produção - ADMIN_OTP_SECRET=JBSWY3DPEHPK3PXP + + # Produção definir em .env + #- ADMIN_OTP_SECRET=${ADMIN_OTP_SECRET} volumes: - - ./database.db:/app/database.db - - ./admin_qr.png:/app/admin_qr.png + - ./data:/data + - ./logs:/app/logs + read_only: true + tmpfs: + - /tmp + security_opt: + - no-new-privileges:true + cap_drop: + - ALL depends_on: redis: condition: service_healthy @@ -47,4 +59,4 @@ volumes: networks: controles_network: - driver: bridge \ No newline at end of file + driver: bridge diff --git a/docs/README.md b/docs/README.md index 2ab0ad8..cd8a11a 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,26 +2,41 @@ Sistema de gerenciamento para a Organização Comunista Internacionalista (OCI) com controle de militantes, cotas, pagamentos e materiais. -## 🚀 Status Atual +## Trilha Recomendada de Leitura -✅ **Sistema com Arquitetura de Permissões Corrigida** -- Aplicação Flask rodando com Docker -- Redis cache integrado e funcionando -- Banco de dados SQLite inicializado -- Usuário admin configurado com OTP -- 30 militantes de teste criados -- Estrutura organizacional completa -- **Sistema de permissões implementado no nível de dados** -- **Menus sempre visíveis, controle transparente** +1. `docs/architecture_summary.md` +2. `docs/rbac.md` +3. `docs/permission_strategy.md` +4. `docs/redis_cache_setup.md` +5. `docs/permission_fixes_summary.md` -## 🎯 Arquitetura de Permissões +## Índice por Tema -O sistema implementa uma estratégia de controle de permissões no **nível de dados**, garantindo que: +### Arquitetura -- **Menus permanecem sempre visíveis** - Não há restrições na interface -- **Dados são filtrados por hierarquia** - Admin → CC → CR → Setor → Célula -- **Templates nunca quebram** - Sempre renderizam, mesmo com dados vazios -- **Tesoureiros têm poder adequado** - Podem fazer tudo que secretários fazem +- `docs/architecture_summary.md`: visão geral do estado da arquitetura. +- `docs/mvc_refactoring.md`: detalhes da refatoração MVC. + +### Permissões e Segurança de Acesso + +- `docs/rbac.md`: níveis de papel e herança de permissões. +- `docs/permission_strategy.md`: estratégia de filtragem de dados e uso em templates. +- `docs/permission_fixes_summary.md`: resumo das correções aplicadas em permissões. + +### Infra e Performance + +- `docs/redis_cache_setup.md`: configuração e uso de cache Redis. + +### Histórico Técnico + +- `docs/alteracoes_db_connection.md`: alterações no gerenciamento de conexão/sessão de banco. + +## Como Manter Esta Pasta Organizada + +- Preferir um arquivo por assunto (evitar documentos muito amplos). +- Começar cada documento com contexto, problema e decisão. +- Registrar trade-offs e impactos de manutenção. +- Atualizar este índice sempre que um novo documento for criado. ### Diagrama da Arquitetura @@ -46,87 +61,6 @@ graph TD J --> K[Always Renders Successfully] ``` -## 🏗️ Arquitetura - -O sistema foi refatorado seguindo o padrão MVC (Model-View-Controller): - -``` -controles/ -├── app.py # Ponto de entrada da aplicação -├── controllers/ # Controladores (lógica de rotas) -├── models/ # Modelos (operações de banco) -├── services/ # Serviços (lógica de negócio) -├── templates/ # Views (templates HTML) -├── static/ # Assets estáticos -└── functions/ # Funções utilitárias -``` - -## 🐳 Docker Setup - -### Pré-requisitos -- Docker e Docker Compose instalados -- Porta 5000 disponível para a aplicação -- Porta 6379 disponível para Redis - -### Inicialização Rápida - -```bash -# Clonar o repositório -git clone -cd controles - -# Iniciar o ambiente completo -make dev-up - -# Verificar status -docker-compose ps - -# Ver logs -make docker-logs -``` - -### Comandos Úteis - -```bash -# Iniciar serviços -make dev-up - -# Parar serviços -make dev-down - -# Ver logs -make docker-logs - -# Status do cache Redis -make cache-status - -# Limpar cache -make cache-clear - -# Reconstruir containers -make docker-build -``` - -## 🔐 Acesso ao Sistema - -### Credenciais do Admin -- **URL**: http://localhost:5000 -- **Usuário**: admin -- **Senha**: admin123 -- **OTP Secret**: JBSWY3DPEHPK3PXP - -### Configuração OTP -1. Instale um aplicativo autenticador (Google Authenticator, Microsoft Authenticator) -2. Configure manualmente: - - Nome: admin - - Segredo: JBSWY3DPEHPK3PXP - - Tipo: TOTP - - Algoritmo: SHA1 - - Dígitos: 6 - - Intervalo: 30 segundos - -**OU** use o QR Code gerado em `/tmp/admin_qr.png` dentro do container. - ## 📊 Funcionalidades ### Gestão de Militantes @@ -152,80 +86,6 @@ make docker-build - Relatórios de vendas - Relatórios de pagamentos -## 🗄️ Banco de Dados - -### Estrutura -- **SQLite** com SQLAlchemy ORM -- **Redis** para cache de performance -- Migrações automáticas -- Dados de teste incluídos - -### Inicialização -O banco é inicializado automaticamente no primeiro startup com: -- 30 militantes de teste -- Estrutura organizacional completa -- Tipos de pagamento e materiais -- Usuário admin configurado - -## 🔧 Tecnologias - -- **Backend**: Flask 2.3.3 -- **Frontend**: Bootstrap 5, HTML5, CSS3, JavaScript -- **Database**: SQLite + SQLAlchemy 2.0.21 -- **Cache**: Redis 7.4.4 -- **Authentication**: Flask-Login + OTP (pyotp) -- **Container**: Docker + Docker Compose -- **Server**: Gunicorn - -## 📁 Estrutura de Arquivos - -``` -controles/ -├── app.py # Aplicação principal -├── controllers/ # Controladores MVC -│ ├── auth_controller.py # Autenticação -│ ├── home_controller.py # Dashboard -│ ├── militante_controller.py # Militantes -│ ├── pagamento_controller.py # Pagamentos -│ ├── cota_controller.py # Cotas -│ └── usuario_controller.py # Usuários -├── models/ # Modelos de dados -├── services/ # Serviços de negócio -├── templates/ # Templates HTML -├── static/ # Assets estáticos -├── functions/ # Funções utilitárias -├── docs/ # Documentação -├── docker-compose.yml # Configuração Docker -├── Dockerfile # Imagem Docker -└── requirements.txt # Dependências Python -``` - -## 🚨 Problemas Resolvidos - -### ✅ QR Code Admin -- **Problema**: Erro de permissão ao salvar QR code -- **Solução**: Múltiplos caminhos de fallback, salvamento em `/tmp/` - -### ✅ Conexão Redis -- **Problema**: Falhas de conexão durante startup -- **Solução**: Retry logic com backoff exponencial - -### ✅ Método OTP -- **Problema**: Método `generate_otp_secret` ausente -- **Solução**: Implementado na classe Usuario - -### ✅ Rede Docker -- **Problema**: Serviços não se comunicavam -- **Solução**: Configuração explícita de redes - -### ✅ Segredo OTP Inválido -- **Problema**: Segredo OTP não estava em formato base32 válido -- **Solução**: Alterado para `JBSWY3DPEHPK3PXP` (formato base32 válido) - -### ✅ Verificação de Arquivo QR Code -- **Problema**: `PermissionError` ao verificar existência do arquivo -- **Solução**: Removida verificação de existência, implementado sistema de fallback - ## 📈 Performance ### Cache Redis @@ -235,6 +95,7 @@ controles/ - API responses: Variável ### Monitoramento + ```bash # Status do cache make cache-status @@ -246,68 +107,6 @@ make docker-logs docker-compose logs redis ``` -## 🔍 Troubleshooting - -### Problemas Comuns - -1. **Redis não conecta** - ```bash - docker-compose logs redis - docker-compose restart redis - ``` - -2. **Aplicação não inicia** - ```bash - docker-compose logs app - docker-compose down && docker-compose up -d - ``` - -3. **Cache não funciona** - ```bash - make cache-status - make cache-clear - ``` - -4. **Erro de OTP** - ```bash - # Verificar se o segredo está correto - echo "JBSWY3DPEHPK3PXP" | base32 -d - ``` - -5. **Erro de permissão QR Code** - ```bash - # O QR code agora salva em /tmp/admin_qr.png - # Se não conseguir salvar, use configuração manual - ``` - -### Logs -- Aplicação: `logs/controles.log` -- Cache: `logs/cache.log` -- Docker: `docker-compose logs` - -## 📚 Documentação - -- [Arquitetura MVC](docs/mvc_refactoring.md) -- [Sistema RBAC](docs/rbac.md) -- [Cache Redis](docs/redis_cache_setup.md) -- [Resumo da Arquitetura](docs/architecture_summary.md) - -## 🤝 Contribuição - -1. Fork o projeto -2. Crie uma branch para sua feature -3. Commit suas mudanças -4. Push para a branch -5. Abra um Pull Request - -## 📄 Licença - -Este projeto é privado para uso da OCI. - -## 📞 Suporte - -Para suporte técnico, entre em contato com a equipe de desenvolvimento. - ## 📋 Recommended Next Steps ### High Priority diff --git a/functions/base.py b/functions/base.py index c7c4a00..f48f00d 100644 --- a/functions/base.py +++ b/functions/base.py @@ -1,16 +1,17 @@ +import os from sqlalchemy import create_engine, text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from pathlib import Path # Configurar caminho do banco de dados -db_dir = Path.home() / '.local' / 'share' / 'controles' -db_dir.mkdir(parents=True, exist_ok=True) -db_path = db_dir / 'database.db' +db_path = Path(__file__).resolve().parents[1] / 'data' / 'database.db' +db_path.parent.mkdir(parents=True, exist_ok=True) +db_fallback = f'sqlite:///{db_path}' # Configurar SQLite com opções para melhor concorrência engine = create_engine( - f'sqlite:///{db_path}', + os.environ.get('DATABASE_URL', db_fallback), connect_args={ 'timeout': 30, # Tempo de espera em segundos 'check_same_thread': False # Permite acesso de múltiplas threads @@ -22,15 +23,15 @@ engine = create_engine( Session = sessionmaker(bind=engine) Base = declarative_base() -def get_db_connection(): +def get_db_session(): """Retorna uma nova sessão do banco de dados com PRAGMAs configuradas""" - session = Session() + db_session = Session() try: # Configurar SQLite para melhor tratamento de concorrência - session.execute(text("PRAGMA journal_mode=WAL")) - session.execute(text("PRAGMA busy_timeout=5000")) - return session + db_session.execute(text("PRAGMA journal_mode=WAL")) + db_session.execute(text("PRAGMA busy_timeout=5000")) + return db_session except Exception as e: - session.rollback() - session.close() + db_session.rollback() + db_session.close() raise e diff --git a/functions/controle.py b/functions/controle.py index dffd3e2..58d1b08 100644 --- a/functions/controle.py +++ b/functions/controle.py @@ -1,10 +1,10 @@ from datetime import datetime, UTC from sqlalchemy.exc import SQLAlchemyError -from functions.database import get_db_connection, Controle as ControleModel +from functions.database import get_db_session, Controle as ControleModel class Controle: def __init__(self): - self.db = get_db_connection() + self.db = get_db_session() def registrar_controle(self, militante_id: int, tipo: str, valor: float, observacao: str = None) -> bool: """ diff --git a/functions/database.py b/functions/database.py index e754890..f856223 100644 --- a/functions/database.py +++ b/functions/database.py @@ -1,8 +1,7 @@ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, UTC from werkzeug.security import generate_password_hash, check_password_hash from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, Numeric, Date, Enum from sqlalchemy.orm import relationship, backref -import os import pyotp import secrets from flask_mail import Message @@ -10,23 +9,23 @@ from flask import url_for import enum from flask_login import UserMixin from .rbac import Role -from .base import Base, engine, get_db_connection +from .base import Base, get_db_session def execute_query(query, params=None): """ Executa uma query usando SQLAlchemy """ - session = get_db_connection() + db = get_db_session() try: - result = session.execute(query, params) - session.commit() + result = db.execute(query, params) + db.commit() return result except Exception as e: - session.rollback() + db.rollback() raise e finally: - session.close() + db.close() class EstadoMilitante(enum.Enum): ATIVO = 'ativo' @@ -149,7 +148,7 @@ class Militante(Base): quadro_orientador = Column(Boolean, default=False) # Campos para Aspirante aspirante = Column(Boolean, default=True) # Por padrão, todo novo militante é aspirante - data_inicio_aspirante = Column(DateTime, default=datetime.utcnow) + data_inicio_aspirante = Column(DateTime, default=datetime.now(UTC)) avaliacao_aspirante = Column(Text) data_avaliacao_aspirante = Column(DateTime) @@ -252,7 +251,7 @@ class Militante(Base): def generate_username(self): """Gera um nome de usuário único baseado no primeiro nome e um código""" from sqlalchemy import func - db = get_db_connection() + db = get_db_session() try: # Pega o primeiro nome primeiro_nome = self.nome.split()[0].lower() @@ -429,7 +428,7 @@ class Usuario(Base, UserMixin): celula_id = Column(Integer, ForeignKey('celulas.id')) session_timeout = Column(Integer, default=30) tipo = Column(String(17), nullable=False) - ultima_atividade = Column(DateTime, default=datetime.utcnow) + ultima_atividade = Column(DateTime, default=datetime.now(UTC)) # Relacionamento com militante militante_id = Column(Integer, ForeignKey('militantes.id')) militante = relationship("Militante", backref=backref("usuario", uselist=False)) @@ -448,7 +447,7 @@ class Usuario(Base, UserMixin): self.ativo = True self.session_timeout = 30 self.tipo = "USUARIO" - self.ultima_atividade = datetime.utcnow() + self.ultima_atividade = datetime.now(UTC) def set_password(self, password): self.password_hash = generate_password_hash(password) @@ -457,23 +456,24 @@ class Usuario(Base, UserMixin): return check_password_hash(self.password_hash, password) def update_last_activity(self): - self.ultima_atividade = datetime.utcnow() + self.ultima_atividade = datetime.now(UTC) def is_session_expired(self): if not self.ultima_atividade: return True - time_diff = datetime.utcnow() - self.ultima_atividade + time_diff = datetime.now(UTC) - self.ultima_atividade return time_diff.total_seconds() > (self.session_timeout * 60) def check_session_timeout(self): """Verifica se a sessão do usuário expirou""" if not self.ultima_atividade: return True - time_diff = datetime.utcnow() - self.ultima_atividade + time_diff = datetime.now(UTC) - self.ultima_atividade return time_diff.total_seconds() > (self.session_timeout * 60) def has_permission(self, permission_name): """Verifica se o usuário tem uma permissão específica""" + # TODO: (talvez) remover, confirmar admin por RBAC if self.is_admin: # Se for admin, tem todas as permissões return True @@ -485,54 +485,66 @@ class Usuario(Base, UserMixin): return False def has_role(self, role_nivel): - """Verifica se o usuário tem um determinado nível de role""" + """Verifica se o usuário tem um nível de role específico.""" for role in self.roles: if role.nivel == role_nivel: return True return False - def get_otp_uri(self): - """Gera a URI para autenticação em duas etapas""" - if not self.otp_secret: - self.otp_secret = pyotp.random_base32() - return pyotp.totp.TOTP(self.otp_secret).provisioning_uri( - self.username, - issuer_name="Sistema de Controles" - ) + def get_highest_role(self): + """Retorna a role de maior nível do usuário.""" + if not self.roles: + return None + return max(self.roles, key=lambda role: role.nivel) + + def has_minimum_role(self, min_level): + """Verifica se o usuário possui ao menos o nível informado.""" + highest_role = self.get_highest_role() + return bool(highest_role and highest_role.nivel >= min_level) def generate_otp_secret(self): """Gera um novo segredo OTP para o usuário""" self.otp_secret = pyotp.random_base32() return self.otp_secret + def get_otp_uri(self): + """Gera a URI para autenticação em duas etapas""" + if not self.otp_secret: + raise ValueError(f"OTP não configurado para {self.username}") + + totp = pyotp.TOTP(self.otp_secret) + return totp.provisioning_uri( + name=self.username, + issuer_name="Sistema de Controles" + ) + def verify_otp(self, code): """Verifica se um código OTP é válido""" if not self.otp_secret: - print(f"Erro: OTP secret não configurado para o usuário {self.username}") - return False + raise ValueError(f"Erro: OTP secret não configurado para o usuário {self.username}") print(f"Verificando OTP para usuário {self.username}") print(f"OTP Secret: {self.otp_secret}") print(f"Código fornecido: {code}") - totp = pyotp.totp.TOTP(self.otp_secret) + totp = pyotp.TOTP(self.otp_secret) is_valid = totp.verify(code) print(f"Resultado da verificação: {'Válido' if is_valid else 'Inválido'}") - print(f"Tempo atual: {datetime.utcnow()}") - print(f"Período atual: {totp.timecode(datetime.utcnow())}") + print(f"Tempo atual: {datetime.now(UTC)}") + print(f"Período atual: {totp.timecode(datetime.now(UTC))}") return is_valid def logout(self): """Registra o logout do usuário""" - self.ultimo_logout = datetime.utcnow() + self.ultimo_logout = datetime.now(UTC) self.motivo_logout = "Logout manual" self.ultima_atividade = None def is_admin_user(self): """Verifica se o usuário é admin""" - return self.is_admin or any(role.nome == "admin" for role in self.roles) + return self.is_admin or any(role.nivel == Role.SECRETARIO_GERAL for role in self.roles) class PagamentoCelula(Base): __tablename__ = 'pagamentos_celula' @@ -605,116 +617,3 @@ class TransacaoPIX(Base): pagamento_id = Column(Integer, ForeignKey('pagamentos.id')) pagamento = relationship("Pagamento", back_populates="transacoes_pix") - -def init_database(): - """Inicializa o banco de dados com dados básicos""" - print("Inicializando banco de dados...") - - session = get_db_connection() - try: - # Criar todas as tabelas - Base.metadata.drop_all(engine) # Remover todas as tabelas existentes - Base.metadata.create_all(engine) - - # Criar roles padrão - roles = [ - ("Administrador", Role.SECRETARIO_GERAL), - ("Secretário", Role.SECRETARIO_CELULA), - ("Militante", Role.MILITANTE_BASICO) - ] - - for nome, nivel in roles: - if not session.query(Role).filter_by(nome=nome).first(): - role = Role(nome=nome, nivel=nivel) - session.add(role) - session.commit() - - # Criar setores padrão - setores = ["Setor 1", "Setor 2", "Setor 3"] - for nome in setores: - if not session.query(Setor).filter_by(nome=nome).first(): - setor = Setor(nome=nome) - session.add(setor) - session.commit() - - # Criar comitês padrão - comites = ["Comitê 1", "Comitê 2", "Comitê 3"] - for nome in comites: - if not session.query(ComiteCentral).filter_by(nome=nome).first(): - comite = ComiteCentral(nome=nome) - session.add(comite) - session.commit() - - # Gerar OTP para admin - admin_otp_secret = os.environ.get('ADMIN_OTP_SECRET') or pyotp.random_base32() - print(f"OTP do admin: {admin_otp_secret}") - - # Criar usuário admin - admin_role = session.query(Role).filter_by(nome="Administrador").first() - setor = session.query(Setor).first() - - admin = Usuario( - username="admin", - email="admin@example.com", - is_admin=True - ) - admin.set_password("admin123") - admin.tipo = "ADMIN" - admin.otp_secret = admin_otp_secret - admin.roles.append(admin_role) - admin.setor = setor - session.add(admin) - session.commit() - - # Gerar QR code - totp = pyotp.totp.TOTP(admin_otp_secret) - provisioning_uri = totp.provisioning_uri("admin", issuer_name="Sistema de Controles") - - import qrcode - qr = qrcode.QRCode(version=1, box_size=10, border=5) - qr.add_data(provisioning_uri) - qr.make(fit=True) - img = qr.make_image(fill_color="black", back_color="white") - - # Tentar salvar em diferentes locais - qr_paths = ['/tmp/admin_qr.png', 'admin_qr.png', '/app/admin_qr.png'] - qr_saved = False - - for qr_path in qr_paths: - try: - img.save(qr_path) - print(f"QR code salvo em {qr_path}") - qr_saved = True - break - except Exception as e: - print(f"Não foi possível salvar o QR code em {qr_path}: {e}") - continue - - if not qr_saved: - print("AVISO: Não foi possível salvar o QR code em nenhum local") - print("O QR code pode ser gerado manualmente usando o URI OTP") - - print("=== Usuário Admin Criado ===") - print(f"Username: admin") - print(f"Senha: admin123") - print(f"Email: {admin.email}") - print(f"OTP Secret: {admin_otp_secret}") - if qr_saved: - print(f"QR Code: {qr_path}") - print(f"URI OTP: {provisioning_uri}") - - # Importar e executar o seed após criar todas as dependências - from seed_data import seed_database - print("\nPopulando banco de dados com dados de teste...") - seed_database() - print("Dados de teste criados com sucesso!") - - except Exception as e: - print(f"Erro na inicialização do banco: {e}") - session.rollback() - raise - finally: - session.close() - -if __name__ == "__main__": - init_database() diff --git a/functions/decorators.py b/functions/decorators.py index c2992c1..879a453 100644 --- a/functions/decorators.py +++ b/functions/decorators.py @@ -2,7 +2,7 @@ from functools import wraps from flask import session, redirect, url_for, flash from flask_login import current_user, login_required from sqlalchemy.orm import joinedload -from .database import get_db_connection, Usuario, Role +from .database import get_db_session, Usuario, Role from .rbac import Permission def require_login(f): @@ -26,7 +26,7 @@ def require_permission(permission_name): flash('Você precisa estar logado para acessar esta página.', 'error') return redirect(url_for('auth.login')) - db = get_db_connection() + db = get_db_session() try: # Carregar o usuário com suas roles e permissões user = db.query(Usuario).options( @@ -58,8 +58,11 @@ def require_permission(permission_name): return decorated_function return decorator -def require_role(role_name): +def require_role(role_level): """Decorador para verificar se o usuário tem um papel específico""" + if not isinstance(role_level, int): + raise TypeError("require_role espera um nível numérico (int), use a classe Role.") + def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): @@ -67,10 +70,10 @@ def require_role(role_name): flash('Você precisa estar logado para acessar esta página.', 'error') return redirect(url_for('auth.login')) - db = get_db_connection() + db = get_db_session() try: user = db.query(Usuario).get(current_user.id) - if not user or not user.has_role(role_name): + if not user or not user.has_role(role_level): flash('Você não tem permissão para acessar esta página.', 'error') return redirect(url_for('index')) @@ -86,6 +89,9 @@ def require_role(role_name): def require_minimum_role(min_level): """Decorador para verificar se o usuário tem um papel com nível mínimo""" + if not isinstance(min_level, int): + raise TypeError("require_minimum_role espera um nível numérico de role (int).") + def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): @@ -93,15 +99,14 @@ def require_minimum_role(min_level): flash('Você precisa estar logado para acessar esta página.', 'error') return redirect(url_for('auth.login')) - db = get_db_connection() + db = get_db_session() try: user = db.query(Usuario).get(current_user.id) if not user: flash('Usuário não encontrado.', 'error') return redirect(url_for('auth.login')) - highest_role = user.get_highest_role() - if not highest_role or highest_role.nivel < min_level: + if not user.has_minimum_role(min_level): flash('Você não tem permissão para acessar esta página.', 'error') return redirect(url_for('index')) @@ -146,31 +151,42 @@ def require_instance_access(instance_type, instance_id): if not current_user.is_authenticated: flash('Por favor, faça login para acessar esta página.', 'error') return redirect(url_for('auth.login')) - - # Verificar acesso baseado na instância do usuário - if instance_type == 'celula': - if not (current_user.celula_id == instance_id or - current_user.has_permission(Permission.VIEW_SECTOR_REPORTS) or - current_user.has_permission(Permission.VIEW_CR_REPORTS) or - current_user.has_permission(Permission.VIEW_CC_REPORTS)): - flash('Você não tem acesso a esta célula.', 'error') - return redirect(url_for('index')) - elif instance_type == 'setor': - if not (current_user.setor_id == instance_id or - current_user.has_permission(Permission.VIEW_CR_REPORTS) or - current_user.has_permission(Permission.VIEW_CC_REPORTS)): - flash('Você não tem acesso a este setor.', 'error') - return redirect(url_for('index')) - elif instance_type == 'cr': - if not (current_user.cr_id == instance_id or - current_user.has_permission(Permission.VIEW_CC_REPORTS)): - flash('Você não tem acesso a este CR.', 'error') - return redirect(url_for('index')) - - # Atualiza timestamp da última atividade - current_user.update_last_activity() - db_session.commit() - - return f(*args, **kwargs) + + db = get_db_session() + try: + user = db.query(Usuario).options( + joinedload(Usuario.roles).joinedload(Role.permissions) + ).get(current_user.id) + if not user: + flash('Usuário não encontrado.', 'error') + return redirect(url_for('auth.login')) + + # Verificar acesso baseado na instância do usuário + if instance_type == 'celula': + if not (user.celula_id == instance_id or + user.has_permission(Permission.VIEW_SECTOR_REPORTS) or + user.has_permission(Permission.VIEW_CR_REPORTS) or + user.has_permission(Permission.VIEW_CC_REPORTS)): + flash('Você não tem acesso a esta célula.', 'error') + return redirect(url_for('index')) + elif instance_type == 'setor': + if not (user.setor_id == instance_id or + user.has_permission(Permission.VIEW_CR_REPORTS) or + user.has_permission(Permission.VIEW_CC_REPORTS)): + flash('Você não tem acesso a este setor.', 'error') + return redirect(url_for('index')) + elif instance_type == 'cr': + if not (user.cr_id == instance_id or + user.has_permission(Permission.VIEW_CC_REPORTS)): + flash('Você não tem acesso a este CR.', 'error') + return redirect(url_for('index')) + + # Atualiza timestamp da última atividade + user.update_last_activity() + db.commit() + + return f(*args, **kwargs) + finally: + db.close() return decorated_function - return decorator \ No newline at end of file + return decorator diff --git a/functions/rbac.py b/functions/rbac.py index 21baf57..c532e88 100644 --- a/functions/rbac.py +++ b/functions/rbac.py @@ -133,183 +133,183 @@ class Permission(Base): def init_rbac(): """Inicializa o sistema RBAC com roles e permissões básicas""" - from .database import Usuario, get_db_connection - session = get_db_connection() + from .database import Usuario, get_db_session + db = get_db_session() try: # Criar role de administrador primeiro - admin_role = session.query(Role).filter_by(nome="Administrador").first() + admin_role = db.query(Role).filter_by(nome="Administrador").first() if not admin_role: admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL) - session.add(admin_role) - session.commit() + db.add(admin_role) + db.commit() # Criar outras roles for nivel, nome in Role.get_roles_list(): if nome != "Administrador": # Pular Administrador pois já foi criado - role = session.query(Role).filter_by(nivel=nivel).first() + role = db.query(Role).filter_by(nivel=nivel).first() if not role: role = Role(nome=nome, nivel=nivel) - session.add(role) + db.add(role) # Criar permissões for nome, descricao in Permission.get_permissions_list(): - permission = session.query(Permission).filter_by(nome=nome).first() + permission = db.query(Permission).filter_by(nome=nome).first() if not permission: permission = Permission(nome=nome, descricao=descricao) - session.add(permission) + db.add(permission) - session.commit() + db.commit() # Dar todas as permissões para o admin - all_permissions = session.query(Permission).all() + all_permissions = db.query(Permission).all() admin_role.permissions = all_permissions - session.commit() + db.commit() # Buscar usuário admin e atribuir role de administrador - admin_user = session.query(Usuario).filter_by(username="admin").first() + admin_user = db.query(Usuario).filter_by(username="admin").first() if admin_user: if admin_role not in admin_user.roles: admin_user.roles = [admin_role] # Substituir roles existentes - session.commit() + db.commit() # Mapear permissões para outros roles - for role in session.query(Role).filter(Role.nome != "Administrador").all(): + for role in db.query(Role).filter(Role.nome != "Administrador").all(): # Militante Básico if role.nivel == Role.MILITANTE_BASICO: role.permissions = [ - session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first() + db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first() ] # Secretário de Célula elif role.nivel == Role.SECRETARIO_CELULA: role.permissions = [ - session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.REGISTER_CELL_PAYMENT).first() + db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.REGISTER_CELL_PAYMENT).first() ] # Membro de Setor elif role.nivel == Role.MEMBRO_SETOR: role.permissions = [ - session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first() + db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first() ] # Secretário de Setor elif role.nivel == Role.SECRETARIO_SETOR: role.permissions = [ - session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), - session.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first() + db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), + db.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first() ] # Membro de CR elif role.nivel == Role.MEMBRO_CR: role.permissions = [ - session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first() + db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first() ] # Secretário de CR elif role.nivel == Role.SECRETARIO_CR: role.permissions = [ - session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(), - session.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first() + db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(), + db.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first() ] # Membro do CC elif role.nivel == Role.MEMBRO_CC: role.permissions = [ - session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first() + db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first() ] # Secretário Geral elif role.nivel == Role.SECRETARIO_GERAL: role.permissions = [ - session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(), - session.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(), - session.query(Permission).filter_by(nome=Permission.MANAGE_CC_CRS).first(), - session.query(Permission).filter_by(nome=Permission.CREATE_CC_CR).first(), - session.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first(), - session.query(Permission).filter_by(nome=Permission.SYSTEM_CONFIG).first() + db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(), + db.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(), + db.query(Permission).filter_by(nome=Permission.MANAGE_CC_CRS).first(), + db.query(Permission).filter_by(nome=Permission.CREATE_CC_CR).first(), + db.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first(), + db.query(Permission).filter_by(nome=Permission.SYSTEM_CONFIG).first() ] - session.commit() + db.commit() except Exception as e: print(f"Erro ao inicializar RBAC: {e}") - session.rollback() + db.rollback() raise finally: - session.close() \ No newline at end of file + db.close() \ No newline at end of file diff --git a/init_db.py b/init_db.py deleted file mode 100644 index d7802f0..0000000 --- a/init_db.py +++ /dev/null @@ -1,19 +0,0 @@ -from functions.database import init_database -from functions.rbac import init_rbac -from create_admin import create_admin_user -from create_test_users import create_test_users - -def init_system(): - print("Inicializando banco de dados...") - init_database() - - print("Inicializando sistema RBAC...") - init_rbac() - - print("Criando usuários iniciais...") - create_admin_user() - create_test_users() - -if __name__ == "__main__": - init_system() - print("Sistema inicializado com sucesso!") \ No newline at end of file diff --git a/init_system.py b/init_system.py deleted file mode 100644 index 7827c30..0000000 --- a/init_system.py +++ /dev/null @@ -1,58 +0,0 @@ -from create_admin import create_admin -from create_test_users import create_test_users -from functions.database import get_db_connection, Usuario -from functions.rbac import Role - -def init_system(): - print("=== Inicializando Sistema ===") - - # Criar admin - print("\nCriando usuário admin...") - create_admin() - - # Criar usuários de teste - print("\nCriando usuários de teste...") - create_test_users() - - # Verificar configuração - print("\n=== Verificando Configuração ===") - session = get_db_connection() - try: - # Verificar admin - admin = session.query(Usuario).filter_by(username='admin').first() - if admin: - print("Admin: OK") - print(f"OTP configurado: {'Sim' if admin.otp_secret else 'Não'}") - else: - print("Admin: FALHOU") - - # Verificar usuários de teste - test_users = ['aligner', 'tester', 'deployer'] - for username in test_users: - user = session.query(Usuario).filter_by(username=username).first() - if user: - print(f"{username}: OK") - print(f"OTP configurado: {'Sim' if user.otp_secret else 'Não'}") - else: - print(f"{username}: FALHOU") - - print("\n=== Instruções ===") - print("1. Use o aplicativo autenticador para configurar o OTP de cada usuário") - print("2. Faça login com cada usuário para testar") - print("3. Altere a senha no primeiro login") - print("\nCredenciais:") - print("Admin:") - print(" Usuário: admin") - print(" Senha: admin123") - print("\nUsuários de teste:") - print(" Usuário: aligner, tester, deployer") - print(" Senha: Test123!@#") - - except Exception as e: - print(f"Erro ao verificar configuração: {str(e)}") - session.rollback() - finally: - session.close() - -if __name__ == "__main__": - init_system() \ No newline at end of file diff --git a/models/militante_model.py b/models/militante_model.py index 2f32945..964b9e0 100644 --- a/models/militante_model.py +++ b/models/militante_model.py @@ -1,4 +1,4 @@ -from functions.database import get_db_connection, Militante, EmailMilitante, Endereco +from functions.database import get_db_session, Militante, EmailMilitante, Endereco from sqlalchemy.orm import joinedload from datetime import datetime from typing import List, Dict, Optional @@ -14,7 +14,7 @@ class MilitanteModel: @invalidate_cache_pattern("militantes:*") def criar_militante(data: Dict) -> Dict: """Cria um novo militante""" - db = get_db_connection() + db = get_db_session() try: # Criar endereço se fornecido endereco_id = None @@ -89,7 +89,7 @@ class MilitanteModel: @cached(expire=1800, key_prefix="militantes") # Cache for 30 minutes def listar_militantes() -> List[Militante]: """Lista todos os militantes""" - db = get_db_connection() + db = get_db_session() try: militantes = db.query(Militante).options( joinedload(Militante.emails), @@ -125,7 +125,7 @@ class MilitanteModel: return cached_militante # Cache miss, get from database - db = get_db_connection() + db = get_db_session() try: militante = db.query(Militante).options( joinedload(Militante.emails), @@ -150,7 +150,7 @@ class MilitanteModel: @invalidate_cache_pattern("militantes:*") def atualizar_militante(militante_id: int, data: Dict) -> Dict: """Atualiza um militante existente""" - db = get_db_connection() + db = get_db_session() try: militante = db.query(Militante).get(militante_id) @@ -228,7 +228,7 @@ class MilitanteModel: @invalidate_cache_pattern("militantes:*") def excluir_militante(militante_id: int) -> Dict: """Exclui um militante""" - db = get_db_connection() + db = get_db_session() try: militante = db.query(Militante).get(militante_id) if not militante: @@ -265,7 +265,7 @@ class MilitanteModel: @cached(expire=1800, key_prefix="militantes") def buscar_por_cpf(cpf: str) -> Optional[Militante]: """Busca um militante por CPF""" - db = get_db_connection() + db = get_db_session() try: militante = db.query(Militante).filter_by(cpf=cpf).first() if militante: diff --git a/models/pagamento_model.py b/models/pagamento_model.py index e924a3f..898996e 100644 --- a/models/pagamento_model.py +++ b/models/pagamento_model.py @@ -1,4 +1,4 @@ -from functions.database import get_db_connection, Pagamento, Militante, TipoPagamento +from functions.database import get_db_session, Pagamento, Militante, TipoPagamento from sqlalchemy.orm import joinedload from datetime import datetime from typing import List, Dict, Optional @@ -9,7 +9,7 @@ class PagamentoModel: @staticmethod def criar_pagamento(data: Dict) -> Dict: """Cria um novo pagamento""" - db = get_db_connection() + db = get_db_session() try: pagamento = Pagamento( militante_id=data['militante_id'], @@ -38,7 +38,7 @@ class PagamentoModel: @staticmethod def listar_pagamentos() -> List[Pagamento]: """Lista todos os pagamentos""" - db = get_db_connection() + db = get_db_session() try: return db.query(Pagamento).join(Militante).order_by(Pagamento.data_pagamento.desc()).all() finally: @@ -47,7 +47,7 @@ class PagamentoModel: @staticmethod def buscar_por_id(pagamento_id: int) -> Optional[Pagamento]: """Busca um pagamento por ID""" - db = get_db_connection() + db = get_db_session() try: return db.query(Pagamento).get(pagamento_id) finally: @@ -56,7 +56,7 @@ class PagamentoModel: @staticmethod def atualizar_pagamento(pagamento_id: int, data: Dict) -> Dict: """Atualiza um pagamento existente""" - db = get_db_connection() + db = get_db_session() try: pagamento = db.query(Pagamento).get(pagamento_id) @@ -90,7 +90,7 @@ class PagamentoModel: @staticmethod def excluir_pagamento(pagamento_id: int) -> Dict: """Exclui um pagamento""" - db = get_db_connection() + db = get_db_session() try: pagamento = db.query(Pagamento).get(pagamento_id) if not pagamento: @@ -119,7 +119,7 @@ class PagamentoModel: @staticmethod def listar_por_celula(celula_id: int) -> List[Pagamento]: """Lista pagamentos de uma célula específica""" - db = get_db_connection() + db = get_db_session() try: return db.query(Pagamento).filter_by(celula_id=celula_id).all() finally: @@ -128,7 +128,7 @@ class PagamentoModel: @staticmethod def listar_por_setor(setor_id: int) -> List[Pagamento]: """Lista pagamentos de um setor específico""" - db = get_db_connection() + db = get_db_session() try: return db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all() finally: @@ -137,7 +137,7 @@ class PagamentoModel: @staticmethod def listar_por_cr(cr_id: int) -> List[Pagamento]: """Lista pagamentos de um CR específico""" - db = get_db_connection() + db = get_db_session() try: return db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all() finally: @@ -146,7 +146,7 @@ class PagamentoModel: @staticmethod def listar_por_militante(militante_id: int) -> List[Pagamento]: """Lista pagamentos de um militante específico""" - db = get_db_connection() + db = get_db_session() try: return db.query(Pagamento).filter_by(militante_id=militante_id).order_by(Pagamento.data_pagamento.desc()).all() finally: @@ -155,7 +155,7 @@ class PagamentoModel: @staticmethod def obter_tipos_pagamento() -> List[TipoPagamento]: """Obtém todos os tipos de pagamento""" - db = get_db_connection() + db = get_db_session() try: return db.query(TipoPagamento).order_by(TipoPagamento.descricao).all() finally: @@ -164,7 +164,7 @@ class PagamentoModel: @staticmethod def obter_militantes() -> List[Militante]: """Obtém todos os militantes""" - db = get_db_connection() + db = get_db_session() try: return db.query(Militante).order_by(Militante.nome).all() finally: diff --git a/routes/admin.py b/routes/admin.py index 9cfe7d1..f241504 100644 --- a/routes/admin.py +++ b/routes/admin.py @@ -1,5 +1,5 @@ from flask import Blueprint, render_template, flash, redirect, url_for, request, jsonify -from functions.database import Usuario, get_db_connection +from functions.database import Usuario, get_db_session from functions.decorators import require_login from flask_login import login_required, current_user from sqlalchemy.orm import joinedload @@ -29,7 +29,7 @@ def admin_required(f): @admin_required def dashboard(): """Dashboard principal da área administrativa com lista de usuários""" - db = get_db_connection() + db = get_db_session() try: now = datetime.now() @@ -68,7 +68,7 @@ def dashboard(): @admin_required def reset_user_otp(user_id): """Reseta o OTP de um usuário""" - db = get_db_connection() + db = get_db_session() try: user = db.query(Usuario).get(user_id) if not user: @@ -89,7 +89,7 @@ def reset_user_otp(user_id): @admin_required def reset_user_password(user_id): """Reseta a senha de um usuário""" - db = get_db_connection() + db = get_db_session() try: user = db.query(Usuario).get(user_id) if not user: @@ -111,7 +111,7 @@ def reset_user_password(user_id): @admin_required def toggle_user_status(user_id): """Ativa/desativa um usuário""" - db = get_db_connection() + db = get_db_session() try: user = db.query(Usuario).get(user_id) if not user: diff --git a/scripts/create_admin.py b/scripts/create_admin.py new file mode 100644 index 0000000..d7c0d47 --- /dev/null +++ b/scripts/create_admin.py @@ -0,0 +1,198 @@ +import os +import pyotp +from pathlib import Path +from functions.database import Usuario, Role, get_db_session +from services.otp_service import generate_qr_code + +ADMIN_USERNAME = "admin" +ADMIN_PASSWORD = "admin123" +ADMIN_ROLE = Role.SECRETARIO_GERAL + + +def salvar_qr_code(user): + """ + Gera o QR code para um usuário específico + + Args: + user: Instância do modelo Usuario + + Returns: + tuple: (caminho do arquivo, URI do OTP) + """ + # Tentar diferentes caminhos para salvar o QR code + qr_paths = [ + Path('/tmp/admin_qr.png'), # Diretório temporário do sistema + Path('/data/admin_qr.png'), # Diretório de dados do container + Path('admin_qr.png') # Diretório atual (fallback fora do container) + ] + + # Tentar salvar em diferentes locais + qr_saved = False + saved_path = None + + img = generate_qr_code(user) # Gera o QR code para o usuário + + for qr_path in qr_paths: + try: + # Tentar salvar o arquivo + img.save(qr_path) + qr_saved = True + saved_path = qr_path + break + except Exception as e: + print(f"Não foi possível salvar o QR code em {qr_path}: {e}") + continue + + if not qr_saved: + print("AVISO: Não foi possível salvar o QR code em nenhum local") + print("O QR code pode ser gerado manualmente usando o URI OTP") + saved_path = None + + return saved_path + + +def _ensure_admin_role(db, admin_user, role): + admin_role = db.query(Role).filter_by(nome=role).first() + if admin_role is None: + admin_role = Role(nome=role, nivel=Role.SECRETARIO_GERAL) + db.add(admin_role) + db.flush() + + if admin_role not in admin_user.roles: + admin_user.roles.append(admin_role) + db.flush() + + +def _ensure_admin_otp(db, admin_user): + if admin_user.otp_secret: + return False + secret = (os.environ.get('ADMIN_OTP_SECRET') or "").strip() + admin_user.otp_secret = secret or admin_user.generate_otp_secret() + db.flush() + return True + + +def create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE, save_qr=True): + """Limpa e cria o usuário admin""" + db = get_db_session() + try: + # Verificar se já existe um usuário admin + admin_user = db.query(Usuario).filter_by(username=username).first() + if admin_user is not None: + db.delete(admin_user) + db.flush() + + print("\n=== Criando Novo Usuário Admin ===") + admin_user = Usuario( + username=username, + email="admin@example.com", + is_admin=True + ) + admin_user.set_password(password) + _ensure_admin_otp(db, admin_user) + _ensure_admin_role(db, admin_user, role) + + db.add(admin_user) + db.commit() + + qr_path = salvar_qr_code(admin_user) + + # Mostrar informações + print("\n=== Informações do Admin ===") + print(f"Username: {admin_user.username}") + print(f"Email: {admin_user.email}") + print(f"Senha: {password}") + print(f"Segredo OTP: {admin_user.otp_secret}") + print(f"URI do OTP: {admin_user.get_otp_uri()}") + if qr_path: + print(f"QR Code salvo em: {qr_path}") + else: + print("QR Code não foi salvo. Use o URI do OTP ou o Segredo OTP para configuração manual.") + + print("\n=== Instruções para Configuração ===") + print("1. Instale um aplicativo autenticador no seu celular") + print(" (Google Authenticator, Microsoft Authenticator, etc)") + print("2. Abra o aplicativo") + print("3. Selecione a opção para adicionar uma nova conta") + if qr_path: + print("4. Escaneie o QR Code salvo em:", qr_path) + print("\nOU configure manualmente:") + print(f"- Nome da conta: {admin_user.username}") + print(f"- Segredo OTP: {admin_user.otp_secret}") + print("- Tipo: Baseado em tempo (TOTP)") + print("- Algoritmo: SHA1") + print("- Dígitos: 6") + print("- Intervalo: 30 segundos") + + # Gerar código atual para verificação + totp = pyotp.TOTP(admin_user.otp_secret) + current_code = totp.now() + print("\n=== Verificação do OTP ===") + print(f"Código OTP atual: {current_code}") + is_valid = admin_user.verify_otp(current_code) + print(f"Verificação do código: {'Sucesso' if is_valid else 'Falha'}") + + if not is_valid: + print("\nALERTA: Verificação do OTP falhou!") + print("Por favor, verifique se o segredo OTP está correto.") + + # Fazer commit final para garantir que tudo foi salvo + db.commit() + + except Exception as e: + db.rollback() + raise e + finally: + db.close() + + +def verify_admin(username=ADMIN_USERNAME, role=ADMIN_ROLE, save_qr=False): + """Verifica se o usuário admin existe e tem OTP configurado""" + db = get_db_session() + try: + admin_user = db.query(Usuario).filter_by(username=username).first() + if admin_user is not None: + print("\n=== Usuário Admin Encontrado ===") + _ensure_admin_otp(db, admin_user) + _ensure_admin_role(db, admin_user, role) + return True + else: + print("\n=== Usuário Admin NÃO Encontrado ===") + return False + + except Exception as e: + print(f"Erro ao verificar o usuário admin: {e}") + raise + finally: + db.close() + + +def rotate_admin_otp(username=ADMIN_USERNAME, save_qr=False): + db = get_db_session() + try: + admin_user = db.query(Usuario).filter_by(username=username).first() + if admin_user is None: + print("Usuário admin não encontrado") + return False + + admin_user.generate_otp_secret() + db.commit() + print(f"OTP do usuário '{username}' foi rotacionado.") + print(f"Novo segredo OTP: {admin_user.otp_secret}") + + if save_qr: + qr_path = salvar_qr_code(admin_user) + if qr_path: + print(f"Novo QR code salvo em: {qr_path}") + else: + print("Não foi possível salvar o QR code automaticamente.") + + except Exception: + db.rollback() + raise + finally: + db.close() + + +if __name__ == "__main__": + create_admin() diff --git a/create_test_users.py b/scripts/create_test_users.py similarity index 94% rename from create_test_users.py rename to scripts/create_test_users.py index 5774f8c..4ec4b1e 100644 --- a/create_test_users.py +++ b/scripts/create_test_users.py @@ -1,9 +1,9 @@ -from functions.database import get_db_connection, Usuario, Role +from functions.database import get_db_session, Usuario, Role from werkzeug.security import generate_password_hash def create_test_users(): """Cria usuários de teste""" - db = get_db_connection() + db = get_db_session() try: # Lista de usuários de teste test_users = [ diff --git a/scripts/init_db.py b/scripts/init_db.py deleted file mode 100644 index 774bcf7..0000000 --- a/scripts/init_db.py +++ /dev/null @@ -1,27 +0,0 @@ -from functions.database import Role, Permissao, RolePermissao, Base, engine -from sqlalchemy.orm import Session - -def init_db(): - Base.metadata.create_all(engine) - - with Session(engine) as session: - # Criar roles - admin = Role(nome='Administrador', nivel=1) - coord = Role(nome='Coordenador', nivel=2) - milit = Role(nome='Militante', nivel=3) - - # Criar permissões - perm_admin = Permissao(nome='admin', descricao='Acesso total') - perm_militantes = Permissao(nome='ver_militantes', descricao='Ver militantes') - # ... outras permissões ... - - session.add_all([admin, coord, milit, perm_admin, perm_militantes]) - session.commit() - - # Associar permissões aos roles - session.add(RolePermissao(role=admin, permissao=perm_admin)) - session.add(RolePermissao(role=coord, permissao=perm_militantes)) - session.commit() - -if __name__ == '__main__': - init_db() \ No newline at end of file diff --git a/scripts/manage.py b/scripts/manage.py new file mode 100644 index 0000000..1ff742a --- /dev/null +++ b/scripts/manage.py @@ -0,0 +1,92 @@ +import argparse +import sys +from pathlib import Path +from dotenv import load_dotenv + +ROOT_DIR = Path(__file__).resolve().parents[1] +if str(ROOT_DIR) not in sys.path: + sys.path.insert(0, str(ROOT_DIR)) + +# Carregar .env antes de importar módulos +load_dotenv(ROOT_DIR / ".env") + +from functions.base import Base, engine, get_db_session +from functions.rbac import Role, init_rbac +from scripts.create_admin import create_admin, rotate_admin_otp +from scripts.create_test_users import create_test_users +from scripts.seed_database import seed_database + + +ADMIN_USERNAME = "admin" +ADMIN_PASSWORD = "admin123" +ADMIN_ROLE = Role.SECRETARIO_GERAL + +def reset_db(args): + """Inicializando banco de dados e criando tabelas""" + db = get_db_session() + try: + # Criar todas as tabelas + Base.metadata.drop_all(engine) + Base.metadata.create_all(engine) + + except Exception as e: + print(f"Erro na drop ou create all da Base: {e}") + db.rollback() + raise + finally: + db.close() + + print("Inicializando sistema RBAC...") + init_rbac() + + print("Cria usuário admin...") + create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE) + + print("Banco inicializado com sucesso.") + return 0 + +def seed_db_with_fakes(args): + """Função para popular o banco com dados fake para desenvolvimento""" + seed_database() + +def seed_db_test_users(args): + """Função para popular o banco com dados fake para desenvolvimento""" + create_test_users() + +def reset_admin(args): + create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE) + +def rotate_admin_otp_cmd(args): + rotate_admin_otp(username=ADMIN_USERNAME, save_qr=True) + + +def build_parser(): + parser = argparse.ArgumentParser(description="Gerenciador de comandos do sistema Controles") + subparsers = parser.add_subparsers(dest="command", required=True) + + db_reset_parser = subparsers.add_parser("db_reset", help="Reseta o banco e recria tabelas, RBAC e admin") + db_reset_parser.set_defaults(func=reset_db) + + db_seed_fake_parser = subparsers.add_parser("db_seed_fake", help="Adiciona dados falsos para desenvolvimento") + db_seed_fake_parser.set_defaults(func=seed_db_with_fakes) + + db_seed_test_users_parser = subparsers.add_parser("db_seed_test_users", help="Adiciona usuários de teste para desenvolvimento") + db_seed_test_users_parser.set_defaults(func=seed_db_test_users) + + admin_reset_parser = subparsers.add_parser("admin_reset", help="Reseta o usuário admin (padrão: admin123)") + admin_reset_parser.set_defaults(func=reset_admin) + + admin_rotate_otp_parser = subparsers.add_parser("admin_rotate_otp", help="Rotaciona o OTP do usuário admin - se não definido em .env") + admin_rotate_otp_parser.set_defaults(func=rotate_admin_otp_cmd) + + return parser + + +def main(): + parser = build_parser() + args = parser.parse_args() + return args.func(args) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/seed_data.py b/scripts/seed_database.py similarity index 81% rename from seed_data.py rename to scripts/seed_database.py index e596282..714edc9 100644 --- a/seed_data.py +++ b/scripts/seed_database.py @@ -4,30 +4,29 @@ from functions.database import ( MaterialVendido, TipoMaterial, VendaJornalAvulso, AssinaturaAnual, RelatorioCotasMensais, RelatorioVendasMateriais, Setor, ComiteCentral, Usuario, Role, EmailMilitante, Endereco, - ComiteRegional, Celula, EstadoMilitante, get_db_connection + ComiteRegional, Celula, EstadoMilitante, get_db_session ) import random from faker import Faker -import time from werkzeug.security import generate_password_hash fake = Faker('pt_BR') -def criar_estrutura_organizacional(session): +def criar_estrutura_organizacional(db): """Cria a estrutura organizacional básica""" print("\nCriando estrutura organizacional...") # Criar Comitê Central cc = ComiteCentral(nome="Comitê Central SP") - session.add(cc) - session.flush() + db.add(cc) + db.flush() # Criar Comitês Regionais crs = [] for nome in ["CR São Paulo", "CR ABC", "CR Campinas"]: cr = ComiteRegional(nome=nome) - session.add(cr) - session.flush() + db.add(cr) + db.flush() crs.append(cr) # Criar Setores para cada CR @@ -38,8 +37,8 @@ def criar_estrutura_organizacional(session): nome=f"Setor {i+1} - {cr.nome}", cr_id=cr.id ) - session.add(setor) - session.flush() + db.add(setor) + db.flush() setores.append(setor) # Criar Células para cada Setor @@ -49,12 +48,12 @@ def criar_estrutura_organizacional(session): nome=f"Célula {i+1} - {setor.nome}", setor_id=setor.id ) - session.add(celula) + db.add(celula) - session.commit() + db.commit() return crs, setores -def criar_tipos_pagamento(session): +def criar_tipos_pagamento(db): """Cria tipos de pagamento padrão""" print("\nCriando tipos de pagamento...") tipos = [ @@ -65,11 +64,11 @@ def criar_tipos_pagamento(session): "Transferência Bancária" ] for tipo in tipos: - if not session.query(TipoPagamento).filter_by(descricao=tipo).first(): - session.add(TipoPagamento(descricao=tipo)) - session.commit() + if not db.query(TipoPagamento).filter_by(descricao=tipo).first(): + db.add(TipoPagamento(descricao=tipo)) + db.commit() -def criar_tipos_material(session): +def criar_tipos_material(db): """Cria tipos de material padrão""" print("\nCriando tipos de material...") tipos = [ @@ -80,11 +79,11 @@ def criar_tipos_material(session): "Cartilha" ] for tipo in tipos: - if not session.query(TipoMaterial).filter_by(descricao=tipo).first(): - session.add(TipoMaterial(descricao=tipo)) - session.commit() + if not db.query(TipoMaterial).filter_by(descricao=tipo).first(): + db.add(TipoMaterial(descricao=tipo)) + db.commit() -def criar_militantes(session, num_militantes, setores): +def criar_militantes(db, num_militantes, setores): """Cria militantes com todos os dados necessários""" print(f"\nCriando {num_militantes} militantes...") militantes = [] @@ -96,13 +95,6 @@ def criar_militantes(session, num_militantes, setores): nome = fake.name() cpf = fake.cpf() - # Email único - while True: - email = fake.email() - if email not in emails_usados: - emails_usados.add(email) - break - # Criar endereço endereco = Endereco( cep=fake.postcode(), @@ -113,12 +105,12 @@ def criar_militantes(session, num_militantes, setores): numero=str(random.randint(1, 999)), complemento=f"Bloco {random.randint(1, 10)}, Apto {random.randint(1, 999)}" if random.random() < 0.3 else None ) - session.add(endereco) - session.flush() + db.add(endereco) + db.flush() # Selecionar setor e célula aleatórios setor = random.choice(setores) - celula = random.choice(session.query(Celula).filter_by(setor_id=setor.id).all()) + celula = random.choice(db.query(Celula).filter_by(setor_id=setor.id).all()) # Definir responsabilidades responsabilidades = 0 @@ -168,27 +160,34 @@ def criar_militantes(session, num_militantes, setores): responsabilidades=responsabilidades, estado=random.choice(list(EstadoMilitante)) ) - session.add(militante) - session.flush() + db.add(militante) + db.flush() + # Email único + while True: + email = fake.email() + if email not in emails_usados: + emails_usados.add(email) + break + # Criar email do militante email_militante = EmailMilitante( militante_id=militante.id, endereco_email=email ) - session.add(email_militante) + db.add(email_militante) militantes.append(militante) - session.commit() + db.commit() except Exception as e: print(f"Erro ao criar militante {i+1}: {e}") - session.rollback() + db.rollback() continue return militantes -def criar_cotas(session, militantes): +def criar_cotas(db, militantes): """Cria cotas mensais para os militantes""" print("\nCriando cotas mensais...") for militante in militantes: @@ -205,16 +204,16 @@ def criar_cotas(session, militantes): data_vencimento=data_base + timedelta(days=30), pago=random.choice([True, False]) ) - session.add(cota) - session.commit() + db.add(cota) + db.commit() except Exception as e: print(f"Erro ao criar cotas para militante {militante.nome}: {e}") - session.rollback() + db.rollback() -def criar_pagamentos(session, militantes): +def criar_pagamentos(db, militantes): """Cria pagamentos para os militantes""" print("\nCriando pagamentos...") - tipos_pagamento = session.query(TipoPagamento).all() + tipos_pagamento = db.query(TipoPagamento).all() for militante in militantes: try: @@ -227,16 +226,16 @@ def criar_pagamentos(session, militantes): valor=random.uniform(50, 500), data_pagamento=fake.date_between(start_date='-1y', end_date='today') ) - session.add(pagamento) - session.commit() + db.add(pagamento) + db.commit() except Exception as e: print(f"Erro ao criar pagamentos para militante {militante.nome}: {e}") - session.rollback() + db.rollback() -def criar_materiais_vendidos(session, militantes): +def criar_materiais_vendidos(db, militantes): """Cria registros de materiais vendidos""" print("\nCriando materiais vendidos...") - tipos_material = session.query(TipoMaterial).all() + tipos_material = db.query(TipoMaterial).all() for militante in militantes: try: @@ -249,13 +248,13 @@ def criar_materiais_vendidos(session, militantes): valor=random.uniform(20, 100), data_venda=fake.date_time_between(start_date='-1y', end_date='now') ) - session.add(material) - session.commit() + db.add(material) + db.commit() except Exception as e: print(f"Erro ao criar materiais vendidos para militante {militante.nome}: {e}") - session.rollback() + db.rollback() -def criar_vendas_jornal(session, militantes): +def criar_vendas_jornal(db, militantes): """Cria vendas de jornal avulso""" print("\nCriando vendas de jornal...") for militante in militantes: @@ -270,16 +269,16 @@ def criar_vendas_jornal(session, militantes): valor_total=quantidade * valor_unitario, data_venda=fake.date_time_between(start_date='-1y', end_date='now') ) - session.add(venda) - session.commit() + db.add(venda) + db.commit() except Exception as e: print(f"Erro ao criar vendas de jornal para militante {militante.nome}: {e}") - session.rollback() + db.rollback() -def criar_assinaturas(session, militantes): +def criar_assinaturas(db, militantes): """Cria assinaturas anuais""" print("\nCriando assinaturas anuais...") - tipos_material = session.query(TipoMaterial).all() + tipos_material = db.query(TipoMaterial).all() for militante in militantes: try: @@ -294,42 +293,45 @@ def criar_assinaturas(session, militantes): data_inicio=data_inicio, data_fim=data_inicio + timedelta(days=365) ) - session.add(assinatura) - session.commit() + db.add(assinatura) + db.commit() except Exception as e: print(f"Erro ao criar assinatura para militante {militante.nome}: {e}") - session.rollback() + db.rollback() def seed_database(): """Função principal para popular o banco de dados""" - session = get_db_connection() + db = get_db_session() try: print("Iniciando população do banco de dados...") - # Criar estrutura organizacional - crs, setores = criar_estrutura_organizacional(session) - # Criar tipos básicos - criar_tipos_pagamento(session) - criar_tipos_material(session) + criar_tipos_pagamento(db) + criar_tipos_material(db) + + # Criar estrutura organizacional + crs, setores = criar_estrutura_organizacional(db) # Criar militantes (30 militantes para teste) - militantes = criar_militantes(session, 30, setores) + militantes = criar_militantes(db, 30, setores) # Criar dados financeiros e materiais - criar_cotas(session, militantes) - criar_pagamentos(session, militantes) - criar_materiais_vendidos(session, militantes) - criar_vendas_jornal(session, militantes) - criar_assinaturas(session, militantes) + criar_cotas(db, militantes) + criar_pagamentos(db, militantes) + criar_materiais_vendidos(db, militantes) + criar_vendas_jornal(db, militantes) + criar_assinaturas(db, militantes) print("\nBanco de dados populado com sucesso!") except Exception as e: print(f"Erro durante a população do banco: {e}") - session.rollback() + db.rollback() finally: - session.close() + db.close() if __name__ == "__main__": - seed_database() \ No newline at end of file + seed_database() + + + \ No newline at end of file diff --git a/services/auth_service.py b/services/auth_service.py index 26161dd..689bbb9 100644 --- a/services/auth_service.py +++ b/services/auth_service.py @@ -1,11 +1,8 @@ -from functions.database import get_db_connection, Usuario +from functions.database import get_db_session, Usuario from flask_login import login_user, logout_user from datetime import datetime from typing import Dict, Optional -import pyotp -import qrcode -import base64 -from io import BytesIO +from services.otp_service import generate_qr_code_base64 class AuthService: """Service para operações de autenticação""" @@ -13,7 +10,7 @@ class AuthService: @staticmethod def autenticar_usuario(email_or_username: str, password: str, otp: str = None) -> Dict: """Autentica um usuário""" - db = get_db_connection() + db = get_db_session() try: # Tenta encontrar o usuário por email ou username user = db.query(Usuario).filter( @@ -64,7 +61,7 @@ class AuthService: @staticmethod def desautenticar_usuario(user) -> Dict: """Desautentica um usuário""" - db = get_db_connection() + db = get_db_session() try: if user: user.logout() @@ -87,7 +84,7 @@ class AuthService: @staticmethod def alterar_senha(user_id: int, senha_atual: str, nova_senha: str) -> Dict: """Altera a senha de um usuário""" - db = get_db_connection() + db = get_db_session() try: user = db.query(Usuario).get(user_id) if not user: @@ -122,20 +119,7 @@ class AuthService: @staticmethod def gerar_qr_code(user) -> str: """Gera um QR code para o usuário""" - if not user.otp_secret: - user.otp_secret = pyotp.random_base32() - - totp = pyotp.TOTP(user.otp_secret) - qr = qrcode.QRCode(version=1, box_size=10, border=5) - qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles")) - qr.make(fit=True) - - img = qr.make_image(fill_color="black", back_color="white") - buffer = BytesIO() - img.save(buffer, format="PNG") - qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8') - - return qr_code + return generate_qr_code_base64(user) @staticmethod def verificar_sessao(user) -> Dict: @@ -154,4 +138,4 @@ class AuthService: return { 'valid': True - } \ No newline at end of file + } diff --git a/services/dashboard_service.py b/services/dashboard_service.py index 4e649bb..72ec2dc 100644 --- a/services/dashboard_service.py +++ b/services/dashboard_service.py @@ -1,4 +1,4 @@ -from functions.database import get_db_connection, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento +from functions.database import get_db_session, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento from sqlalchemy import func from sqlalchemy.orm import joinedload from datetime import datetime, timedelta @@ -15,7 +15,7 @@ class DashboardService: @cached(expire=300, key_prefix="dashboard") # Cache for 5 minutes def get_dashboard_stats() -> Dict[str, Any]: """Get dashboard statistics with caching""" - db = get_db_connection() + db = get_db_session() try: # Get cached stats first cache_key = CacheKeys.DASHBOARD_STATS @@ -146,7 +146,7 @@ class DashboardService: @cached(expire=600, key_prefix="dashboard") # Cache for 10 minutes def get_militante_stats() -> Dict[str, Any]: """Get militante-specific statistics""" - db = get_db_connection() + db = get_db_session() try: # Militantes por estado estados = db.query(Militante.estado, func.count(Militante.id)).group_by(Militante.estado).all() @@ -175,7 +175,7 @@ class DashboardService: @cached(expire=300, key_prefix="dashboard") def get_financial_stats() -> Dict[str, Any]: """Get financial statistics""" - db = get_db_connection() + db = get_db_session() try: # Total de pagamentos total_pagamentos = db.query(func.sum(Pagamento.valor)).scalar() @@ -218,7 +218,7 @@ class DashboardService: @staticmethod def obter_ultimos_militantes(limite: int = 5) -> List[Militante]: """Obtém os últimos militantes cadastrados""" - db = get_db_connection() + db = get_db_session() try: return db.query(Militante).order_by(Militante.id.desc()).limit(limite).all() finally: @@ -227,7 +227,7 @@ class DashboardService: @staticmethod def obter_ultimos_pagamentos(limite: int = 5) -> List[Pagamento]: """Obtém os últimos pagamentos realizados""" - db = get_db_connection() + db = get_db_session() try: return db.query(Pagamento).join(Militante).order_by(Pagamento.data_pagamento.desc()).limit(limite).all() finally: @@ -236,7 +236,7 @@ class DashboardService: @staticmethod def obter_tipos_pagamento() -> List[TipoPagamento]: """Obtém todos os tipos de pagamento""" - db = get_db_connection() + db = get_db_session() try: return db.query(TipoPagamento).all() finally: diff --git a/services/otp_service.py b/services/otp_service.py new file mode 100644 index 0000000..fee160b --- /dev/null +++ b/services/otp_service.py @@ -0,0 +1,19 @@ +import base64 +import pyotp +import qrcode +from io import BytesIO + +def generate_qr_code(user): + """Gera imagem PIL do QR code OTP para o usuário.""" + qr = qrcode.QRCode(version=1, box_size=10, border=5) + qr.add_data(user.get_otp_uri()) + qr.make(fit=True) + return qr.make_image(fill_color="black", back_color="white") + + +def generate_qr_code_base64(user): + """Gera QR code OTP codificado em base64 (PNG).""" + img = generate_qr_code(user) + buffer = BytesIO() + img.save(buffer, format="PNG") + return base64.b64encode(buffer.getvalue()).decode("utf-8") diff --git a/sql/migrate_db.py b/sql/migrate_db.py index 17081d9..3ae4475 100644 --- a/sql/migrate_db.py +++ b/sql/migrate_db.py @@ -2,10 +2,15 @@ import os import sqlite3 import sys from pathlib import Path +from dotenv import load_dotenv # Adiciona o diretório raiz ao PYTHONPATH -root_dir = str(Path(__file__).parent.parent) -sys.path.append(root_dir) +ROOT_DIR = Path(__file__).resolve().parents[1] +if str(ROOT_DIR) not in sys.path: + sys.path.insert(0, str(ROOT_DIR)) + +# Carregar .env antes de importar módulos +load_dotenv(ROOT_DIR / ".env") from functions.base import Base, engine from functions.database import init_database @@ -63,4 +68,4 @@ def migrate_database(): print("Migração concluída com sucesso!") if __name__ == '__main__': - migrate_database() \ No newline at end of file + migrate_database() diff --git a/sql/migrate_rbac.py b/sql/migrate_rbac.py index 05b854f..314d60c 100644 --- a/sql/migrate_rbac.py +++ b/sql/migrate_rbac.py @@ -1,25 +1,25 @@ -from functions.database import get_db_connection, Usuario +from functions.database import get_db_session, Usuario from functions.rbac import Role, Permission def migrate_existing_users(): """Migra os usuários existentes para o novo sistema RBAC""" - session = get_db_connection() + db = get_db_session() try: # Buscar todos os usuários - usuarios = session.query(Usuario).all() + usuarios = db.query(Usuario).all() # Buscar ou criar role de administrador - admin_role = session.query(Role).filter_by(nome="Administrador").first() + admin_role = db.query(Role).filter_by(nome="Administrador").first() if not admin_role: admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL) - session.add(admin_role) + db.add(admin_role) # Buscar ou criar role de militante básico - militante_role = session.query(Role).filter_by(nome="Militante Básico").first() + militante_role = db.query(Role).filter_by(nome="Militante Básico").first() if not militante_role: militante_role = Role(nome="Militante Básico", nivel=Role.MILITANTE_BASICO) - session.add(militante_role) + db.add(militante_role) # Atualizar usuários for usuario in usuarios: @@ -33,15 +33,15 @@ def migrate_existing_users(): else: usuario.roles.append(militante_role) - session.commit() + db.commit() print("Migração de usuários concluída com sucesso!") except Exception as e: - session.rollback() + db.rollback() print(f"Erro durante a migração de usuários: {str(e)}") raise e finally: - session.close() + db.close() if __name__ == '__main__': migrate_existing_users() \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index ffbd6c3..56e27df 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,6 @@ import pytest from app import create_app -from functions.database import init_database, get_db_connection +from functions.database import init_database, get_db_session @pytest.fixture def app(): @@ -15,7 +15,7 @@ def app(): yield app # Limpar banco após os testes - db = get_db_connection() + db = get_db_session() try: db.execute('DROP TABLE IF EXISTS usuarios CASCADE') db.commit() diff --git a/tests/test_admin_routes.py b/tests/test_admin_routes.py index 5e897aa..e3af940 100644 --- a/tests/test_admin_routes.py +++ b/tests/test_admin_routes.py @@ -1,13 +1,13 @@ import pytest from flask import url_for -from functions.database import Usuario, get_db_connection +from functions.database import Usuario, get_db_session from werkzeug.security import generate_password_hash import json @pytest.fixture def admin_user(client): """Fixture que cria um usuário admin para testes""" - db = get_db_connection() + db = get_db_session() try: admin = Usuario( username='admin_test', @@ -74,7 +74,7 @@ def test_toggle_status(auth_admin_client, admin_user): def test_acesso_nao_admin(client): """Testa acesso de usuário não admin""" - db = get_db_connection() + db = get_db_session() try: # Criar usuário normal user = Usuario( diff --git a/tests/test_menu_navigation.py b/tests/test_menu_navigation.py index 9c63a26..43b463a 100644 --- a/tests/test_menu_navigation.py +++ b/tests/test_menu_navigation.py @@ -1,7 +1,7 @@ import pytest from flask import url_for from flask_login import login_user -from functions.database import get_db_connection, Usuario +from functions.database import get_db_session, Usuario import pyotp class TestMenuNavigation: