6 Commits

Author SHA1 Message Date
6dcf2aab7b pequeníssimas correções 2026-03-06 18:07:50 -03:00
246015563b Pequenas correções
- atualizando requirements
- gitignore com as extensoes de database
- garantindo admin com role associada
2026-03-06 17:37:54 -03:00
LS
bb9d7cf8a6 fix: compatibilidade Python 3.14 - Pillow e SQLAlchemy
- Pillow: 10.0.1 -> >=10.4.0 (fix KeyError __version__ no build)
- SQLAlchemy: 2.0.21 -> >=2.0.36 (fix AssertionError TypingOnly)
- Remove import não utilizado safe_rollback em militante_controller

Made-with: Cursor
2026-03-05 21:35:33 -03:00
525b4530c0 fix typos e declaração de volume na raiz do composer_cross é desnecessaria 2026-02-28 11:09:36 -03:00
2b1668206d - inits centralizados, READMEs atualizados
- padronizando o nome de get_db_connection e session para get_db_session, para não confundir com session do Flask ou sessoes web

- corrigindo potenciais erros

-- has_permission nao consegue com lazy load carregar permission depois de load_user fechar a conexao, entao joinedLoad com Permission antes de fechar

-- db.rollback não existe caso db = get_db_session() apareça muito depois dentro do try, padronizando antes de try

--- comparar role por nivel (Role.SECRETARIO_GERAL) e nao por nome ("Secretario Geral")

- unificacao de get_otp_qr_code

- mudança de nowutc() para now(UTC) conforme novo padrão
2026-02-20 17:19:15 -03:00
6882b57081 get_db_connection duplicado, consolidando em base.py 2026-02-16 21:49:23 -03:00
47 changed files with 1269 additions and 1233 deletions

2
.gitignore vendored
View File

@@ -260,6 +260,8 @@ poetry.toml
pyrightconfig.json
database.db
database.db-shm
database.db-wal
admin_qr.png
# End of https://www.toptal.com/developers/gitignore/api/python,flask

View File

@@ -1,48 +1,48 @@
FROM alpine:latest
# Instalar dependências do sistema
RUN apk update && \
apk add --no-cache \
python3 \
py3-pip \
make \
git \
gcc \
python3-dev \
musl-dev \
linux-headers
# Criar link simbólico para python3
RUN ln -sf python3 /usr/bin/python
# Definir diretório de trabalho
# Diretório de trabalho
WORKDIR /app
# Copiar arquivos do projeto
# UID/GID configuráveis para compatibilizar permissões em bind mounts Linux
ARG APP_UID=1000
ARG APP_GID=1000
# Instalar Python no Alpine e criar alias `python`
RUN apk add --no-cache python3 py3-pip \
&& ln -sf python3 /usr/bin/python
# Instalar dependências Python em venv usando build deps temporários
COPY requirements.txt .
RUN apk add --no-cache --virtual .build-deps \
gcc \
musl-dev \
linux-headers \
&& python -m venv /venv \
&& /venv/bin/pip install --upgrade pip \
&& /venv/bin/pip install --no-cache-dir -r requirements.txt \
&& apk del .build-deps
# Copiar código da aplicação
COPY . .
# Criar e ativar ambiente virtual
RUN python -m venv /venv && \
. /venv/bin/activate && \
pip install --upgrade pip && \
pip install -r requirements.txt
# Criar usuário sem privilégios e diretórios de escrita necessários
RUN addgroup -S -g "${APP_GID}" appgroup \
&& adduser -S -D -H -u "${APP_UID}" -G appgroup appuser \
&& mkdir -p /data /app/logs \
&& chown -R appuser:appgroup /app /data /venv
# Ambiente padrão
ENV PATH="/venv/bin:$PATH" \
FLASK_APP=app.py \
FLASK_ENV=production \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
# Rodar aplicação como usuário não-root
USER appuser
# Expor a porta que o Flask usa
EXPOSE 5000
# Definir o ambiente virtual como padrão
ENV PATH="/venv/bin:$PATH"
ENV FLASK_APP=app.py
ENV FLASK_ENV=production
# Criar script de inicialização
RUN echo '#!/bin/sh' > /app/start.sh && \
echo 'echo "Inicializando banco de dados..."' >> /app/start.sh && \
echo 'python init_db.py' >> /app/start.sh && \
echo 'echo "Banco de dados inicializado!"' >> /app/start.sh && \
echo 'echo "Iniciando aplicação..."' >> /app/start.sh && \
echo 'exec gunicorn --bind 0.0.0.0:5000 app:app' >> /app/start.sh && \
chmod +x /app/start.sh
# Comando para rodar a aplicação
CMD ["/app/start.sh"]
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]

113
Makefile
View File

@@ -1,58 +1,127 @@
.PHONY: install clean db-reset db-seed-fake db-seed-test-users admin-reset admin-rotate-otp \
run run-gunicorn docker-db-reset docker-db-seed-fake docker-db-seed-test-users \
docker-admin-reset docker-admin-rotate-otp docker-build docker-up docker-down docker-logs \
docker-restart docker-db-reset-xplat docker-db-seed-fake-xplat docker-db-seed-test-users-xplat \
docker-admin-reset-xplat docker-admin-rotate-otp-xplat docker-build-xplat docker-up-xplat \
docker-down-xplat docker-logs-xplat cache-clear cache-status cache-keys dev-up dev-down \
prod-build prod-up prod-logs cache-warmup cache-monitor
install:
pip install -r requirements.txt
clean:
rm -rf ~/.local/share/controles/database.db*
rm -f ~/.local/share/controles/database.db*
rm -f database.db*
rm -f data/database.db*
rm -f admin_qr.png
rm -f data/admin_qr.png
rm -f /tmp/admin_qr.png
find . -type d -name "__pycache__" -prune -exec rm -rf {} +
init-db: clean
python init_db.py
db-reset: clean
PYTHONUNBUFFERED=1 python -B scripts/manage.py db_reset
seed: init-db
python seed.py
# Apenas seed (seed_database.py)
db-seed-fake:
PYTHONUNBUFFERED=1 python -B scripts/manage.py db_seed_fake
init:
python app.py --init
# Apenas seed (create_test_users.py)
db-seed-test-users:
PYTHONUNBUFFERED=1 python -B scripts/manage.py db_seed_test_users
# Busca o OTP padrão
admin-reset:
PYTHONUNBUFFERED=1 python -B scripts/manage.py admin_reset
# Novo OTP
admin-rotate-otp:
PYTHONUNBUFFERED=1 python -B scripts/manage.py admin_rotate_otp
# Server padrão do python
run:
python app.py
PYTHONUNBUFFERED=1 python -B app.py
run-with-seed: seed init run
reset-admin: clean
python create_admin.py
# server padrão de produção (recomendado)
run-gunicorn:
PYTHONUNBUFFERED=1 python -B -m gunicorn --bind 0.0.0.0:5000 app:app
# Docker commands
docker-db-reset:
mkdir -p data logs
docker-compose -f docker-compose.yml exec app python -B scripts/manage.py db_reset
docker-db-seed-fake:
docker-compose -f docker-compose.yml exec app python -B scripts/manage.py db_seed_fake
docker-db-seed-test-users:
docker-compose -f docker-compose.yml exec app python -B scripts/manage.py db_seed_test_users
docker-admin-reset:
docker-compose -f docker-compose.yml exec app python -B scripts/manage.py admin_reset
docker-admin-rotate-otp:
docker-compose -f docker-compose.yml exec app python -B scripts/manage.py admin_rotate_otp
docker-build:
docker-compose build
mkdir -p data logs
docker-compose -f docker-compose.yml build
docker-up:
docker-compose up -d
mkdir -p data logs
docker-compose -f docker-compose.yml up -d
docker-down:
docker-compose down
docker-compose -f docker-compose.yml down
docker-logs:
docker-compose logs -f
docker-compose -f docker-compose.yml logs -f
docker-restart:
docker-compose restart
docker-compose -f docker-compose.yml restart
# Docker commands (fallback cross-platform)
docker-db-reset-xplat:
docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py db_reset
docker-db-seed-fake-xplat:
docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py db_seed_fake
docker-db-seed-test-users-xplat:
docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py db_seed_test_users
docker-admin-reset-xplat:
docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py admin_reset
docker-admin-rotate-otp-xplat:
docker-compose -f docker-compose.crossplatform.yml exec app python -B scripts/manage.py admin_rotate_otp
docker-build-xplat:
mkdir -p data logs
docker-compose -f docker-compose.crossplatform.yml build
docker-up-xplat:
docker-compose -f docker-compose.crossplatform.yml up -d
docker-down-xplat:
docker-compose -f docker-compose.crossplatform.yml down
docker-logs-xplat:
docker-compose -f docker-compose.crossplatform.yml logs -f
# Redis cache commands
cache-clear:
docker-compose exec redis redis-cli FLUSHDB
docker-compose -f docker-compose.yml exec redis redis-cli FLUSHDB
cache-status:
docker-compose exec redis redis-cli INFO
docker-compose -f docker-compose.yml exec redis redis-cli INFO
cache-keys:
docker-compose exec redis redis-cli KEYS "*"
docker-compose -f docker-compose.yml exec redis redis-cli KEYS "*"
# Development with Docker
dev-up: docker-build docker-up
@echo "Development environment started with Redis cache"
@echo "Application: http://localhost:5000"
@echo "Redis: localhost:6379"
dev-down: docker-down
@echo "Development environment stopped"
@@ -77,4 +146,4 @@ cache-warmup:
cache-monitor:
@echo "Monitoring Redis cache..."
watch -n 5 'docker-compose exec redis redis-cli INFO memory'
watch -n 5 'docker-compose -f docker-compose.yml exec redis redis-cli INFO memory'

313
README.md
View File

@@ -1,10 +1,258 @@
# Sistema de Controle de Militantes
# Sistema de Controles OCI
Sistema para gerenciamento de militantes, células, setores e comitês regionais.
Sistema web para gestão organizacional (militantes, estrutura hierárquica, cotas, pagamentos e materiais), com autenticação por senha + OTP, permissões RBAC e cache Redis.
## 🔧 Tecnologias
- **Backend**: Flask 3.0.2
- **Frontend**: Bootstrap 5, HTML5, CSS3, JavaScript
- **Database**: SQLite + SQLAlchemy 2.0+ (>= 2.0.36)
- **Cache**: Redis 7.4.4 (opcional fora do Docker)
- **Authentication**: Flask-Login + OTP (pyotp)
- **Container**: Docker + Docker Compose
- **Server**: Gunicorn
## 🚀 Status Atual
- Sistema com Arquitetura de Permissões (RBAC)
- Sistema de permissões implementado no nível de dados
- Estrutura organizacional completa
- Aplicação Flask rodando com Docker
- Redis cache integrado e funcionando
- Banco de dados SQLite inicializado
- Usuário admin configurado com OTP
- 30 militantes de teste criados
- Menus sempre visíveis, controle transparente
## 🏗️ Arquitetura de Permissões
O sistema implementa uma estratégia de controle de permissões no **nível de dados**, garantindo que:
- **Menus permanecem sempre visíveis** - Não há restrições na interface
- **Dados são filtrados por hierarquia** - Admin → CC → CR → Setor → Célula
- **Templates nunca quebram** - Sempre renderizam, mesmo com dados vazios
## ⚙️ Instalação - Pré-requisitos
- Docker + Docker Compose (para fluxo com containers)
- Porta 5000 disponível para a aplicação
- Porta 6379 disponível para Redis
- Python 3.10+ (recomendado)
- `pip`
- `make`
## 🐳 Primeiro Inicio com Docker (recomendado)
### 0. Clone o repositorio
```bash
git clone git@gitea.comunatec.org:comunatec/controles.git
cd controles
```
### 1. Resete o banco
```bash
make docker-db-reset
```
### 2. Adicione dados fakes para testes (opcional)
```bash
make docker-db-seed-fake
```
### 3. Subir aplicação
```bash
make dev-up
```
### 4. Acompanhar logs
- Aplicação: `logs/controles.log`
- Cache: `logs/cache.log`
- Docker: `docker-compose logs`
```bash
make docker-logs # Toda a aplicação
docker-compose logs redis # Somente o redis
make cache-status # INFO do redis
```
### 5. Descer aplicação
```bash
make dev-down
```
## 🐍 Primeiro Inicio - Execução Local (Sem Docker)
### 0. Clone o repositorio
```bash
git clone git@gitea.comunatec.org:comunatec/controles.git
cd controles
```
### 1. Ambiente Python
```bash
python -m venv .venv
source .venv/bin/activate # Linux/Mac
# ou
venv\Scripts\activate # Windows
pip install -r requirements.txt
```
### 2. Crie o `.env` na raiz
Exemplo:
```env
# Usando OTP padrão para não trocar toda hora no desenvolvimento
ADMIN_OTP_SECRET=JBSWY3DPEHPK3PXP
# Para usar o mesmo banco que o Docker (Linux/WSL permite bind-mount)
DATABASE_URL=sqlite:///data/database.db
REDIS_URL=redis://redis:6379/0
FLASK_APP=app.py
FLASK_ENV=development
SECRET_KEY=troque_esta_chave
APP_UID=1000
APP_GID=1000
MAIL_SERVER=seu_servidor_smtp
MAIL_PORT=587
MAIL_USE_TLS=True
MAIL_USERNAME=seu_email
MAIL_PASSWORD=sua_senha
```
Se Redis não estiver disponível localmente, a aplicação continua rodando sem cache.
### 3. Inicialize o banco e rode
```bash
make db-reset
make db-seed-fake # opcional
make run
# ou
make run-gunicorn # server de produção
```
## 🔐 Acesso ao Sistema
### Credenciais do Admin
- **URL**: http://localhost:5000
- **Usuário**: admin
- **Senha**: admin123
- **OTP Secret**: JBSWY3DPEHPK3PXP
### Configuração OTP
1. Instale um aplicativo autenticador (Google Authenticator, Microsoft Authenticator)
2. Configure manualmente:
- Descrição da chave (Codinome): Controles-OCI-admin
- Segredo OTP (Sua Chave): JBSWY3DPEHPK3PXP
- Tipo: TOTP
- Algoritmo: SHA1
- Dígitos: 6
- Intervalo: 30 segundos
**OU** use o QR Code gerado em `/tmp/admin_qr.png` ou `/data/admin_qr.png` ou `admin_qr.png`.
PS: Google Authenticator só tem "Codinome" e "Sua Chave" de config, e tá tudo bem.
## Testes Automatizados
```bash
# ambiente já configurado
pip install -r tests/requirements-test.txt
pytest
```
Também existe `run_tests.sh`, que monta um venv e executa a suíte automaticamente.
- TODO: Talvez trocar o nome para venv_test
## 📁 Estrutura de arquivos
O sistema busca seguir padrão MVC (Model-View-Controller), atualmente está:
```
controles/
├── controllers/ # Controladores (lógica de rotas)
├── data/ # Banco de dados (e talvez qr_code admin)
├── docs/ # Documentações da arquitetura
├── functions/ # Funções utilitárias
├── logs/ # Logs de aplicação, redis...
├── migrations/ # Alterações de banco para não perder dados (produção)
├── models/ # Modelos (operações de banco)
├── routes/ # Rotas de aplicação
├── scripts/ # Scripts de gerenciamento
├── services/ # Serviços (lógica de negócio)
├── sql/ # Migrate para o rbac
├── static/ # Arquivos estáticos (icon/css/js)
├── templates/ # Views (templates HTML)
├── tests/ # Testes automatizados
├── utils/ # Funções sem regra de negócio ou dependencia de domínio
├── app.py # Ponto de entrada da aplicação
├── docker-compose.yml # Configuração Docker
├── Dockerfile # Imagem Docker
└── requirements.txt # Dependências Python
```
- TODO: temos duas rotas (routes e controllers)? Unificar futuramente.
- TODO: sql/migrate_db parece utilizar outro banco.
## 🤝 Contribuição
1. Crie uma branch para sua feature
2. Commit suas mudanças
3. Push sua branch para o Gitea
4. Outro camarada verifica a branch
5. Abra um Pull Request para a branch solicitada
## 📄 Licença
Este projeto é privado para uso da OCI.
## 🔍 Troubleshooting
1. **Redis não conecta**
```bash
docker-compose logs redis
docker-compose restart redis
```
- Redis está indisponível localmente, mas app continua executando mesmo fora do Docker.
2. **Cache não funciona**
```bash
make cache-status
make cache-clear
```
3. **Aplicação não inicia**
```bash
docker-compose logs app
docker-compose down && docker-compose up -d
```
4. **Modificações no banco local não alteram o banco no Docker**
- Linux bind mount no grupo de usuario errado: ajuste `APP_UID`/`APP_GID` no `Dockerfile` para seu grupo de usuarios (padrão=1000).
- Docker com engine do Windows não consegue fazer bind mount, então alterações no banco local não refletem no banco do Docker, use as operações dentro do docker com make docker-* ou no windows instale o wsl2 e instale o Docker com apenas a engine "Docker no WSL".
## Estrutura de Permissões (RBAC)
O sistema utiliza um sistema de controle de acesso baseado em papéis (RBAC) com a seguinte hierarquia:
O sistema utiliza um sistema de controle de acesso baseado em papéis (RBAC), onde a verificação de ações são feitas com permissões (permission), e as permissões são pré-definidas com base em papeis (role). Que possuem a seguinte hierarquia:
### Níveis de Papéis
@@ -47,41 +295,7 @@ O sistema utiliza um sistema de controle de acesso baseado em papéis (RBAC) com
- Criar CRs
- Configurar sistema
## Instalação
1. Clone o repositório
2. Crie um ambiente virtual:
```bash
python -m venv venv
source venv/bin/activate # Linux/Mac
# ou
venv\Scripts\activate # Windows
```
3. Instale as dependências:
```bash
pip install -r requirements.txt
```
4. Execute as migrações do banco de dados:
```bash
python sql/migrate_db.py
```
5. Configure as variáveis de ambiente no arquivo `.env`:
```
FLASK_APP=app.py
FLASK_ENV=development
SECRET_KEY=sua_chave_secreta
MAIL_SERVER=seu_servidor_smtp
MAIL_PORT=587
MAIL_USE_TLS=True
MAIL_USERNAME=seu_email
MAIL_PASSWORD=sua_senha
```
6. Execute o aplicativo:
```bash
flask run
```
## Uso
## Uso do RBAC
### Decoradores de Permissão
@@ -99,31 +313,32 @@ O sistema fornece três decoradores para controle de acesso:
- Verifica se o usuário tem um papel com nível mínimo
- Exemplo: `@require_minimum_role(Role.SECRETARIO_CR)`
### Verificando Permissões no Código
### Verificando Permissões e Papéis no Código
```python
# Verificar se um usuário tem uma permissão
if user.has_permission('create_cell_member'):
if user.has_permission(Permission.CREATE_CELL_MEMBER):
# Faça algo
# Verificar se um usuário tem um papel
if user.has_role('Secretário de Célula'):
if user.has_role(Role.SECRETARIO_CELULA):
# Faça algo
# Obter o papel mais alto do usuário
highest_role = user.get_highest_role()
if highest_role and highest_role.nivel >= Role.SECRETARIO_CR:
# Verificar se o usuário tem nível secretário de célula ou superior
if user.has_minimum_role(Role.SECRETARIO_CELULA):
# Faça algo
```
## Estrutura do Banco de Dados
## Documentação Complementar
O sistema utiliza as seguintes tabelas para o RBAC:
- `roles`: Armazena os papéis disponíveis
- `permissions`: Armazena as permissões disponíveis
- `role_permissions`: Mapeia papéis para permissões
- `user_roles`: Mapeia usuários para papéis
- Documentação complementar: `docs/README.md`
- RBAC: `docs/rbac.md`
- Estratégia de permissões: `docs/permission_strategy.md`
- Redis e cache: `docs/redis_cache_setup.md`
- Histórico de correções de permissões: `docs/permission_fixes_summary.md`
## Segurança

68
app.py
View File

@@ -1,19 +1,24 @@
import os
import secrets
import sys
import logging
import time
from pathlib import Path
from flask import Flask
from flask_bootstrap5 import Bootstrap
from flask_bootstrap import Bootstrap5
from flask_login import LoginManager
from flask_wtf.csrf import CSRFProtect
from flask_mail import Mail
from functions.database import get_db_connection, Usuario
from functions.rbac import init_rbac
from functions.template_helpers import permission_context_processor, init_template_filters, safe_render_helper
from sqlalchemy.orm import joinedload
import os
import secrets
from dotenv import load_dotenv
import sys
import logging
# Carregar .env antes de importar módulos
load_dotenv(Path(__file__).resolve().parent / ".env")
from functions.database import get_db_session, Usuario
from functions.rbac import Role
from functions.template_helpers import permission_context_processor, init_template_filters, safe_render_helper
from logging.handlers import RotatingFileHandler
import time
# Importar blueprints
from controllers.auth_controller import auth_bp
@@ -28,8 +33,6 @@ from routes.admin import admin_bp
# Import cache service
from services.cache_service import cache_service
load_dotenv()
def setup_logging(app):
"""Configure logging for the application"""
if not app.debug and not app.testing:
@@ -69,7 +72,7 @@ def create_app():
setup_logging(app)
# Configurar Bootstrap
bootstrap = Bootstrap(app)
bootstrap = Bootstrap5(app)
# Configurar CSRF Protection (desabilitado temporariamente)
# csrf = CSRFProtect()
@@ -99,12 +102,11 @@ def create_app():
@login_manager.user_loader
def load_user(user_id):
"""Carrega o usuário pelo ID"""
db = get_db_connection()
"""Carrega o usuário pelo ID com roles e permissions (eager)."""
db = get_db_session()
try:
# Carregar o usuário com suas roles
user = db.query(Usuario).options(
joinedload(Usuario.roles)
joinedload(Usuario.roles).joinedload(Role.permissions)
).get(user_id)
return user
finally:
@@ -162,21 +164,6 @@ def create_app():
return app
def init_system():
"""Inicializa o sistema"""
print("Inicializando sistema...")
# Inicializar RBAC
print("Inicializando RBAC...")
init_rbac()
# Criar usuário admin se não existir
from create_admin import create_admin_user
print("Criando usuário admin...")
create_admin_user()
print("Sistema inicializado com sucesso!")
def main():
"""Função principal"""
# Criar a aplicação
@@ -187,12 +174,13 @@ def main():
app = main()
if __name__ == '__main__':
# Verificar se é para inicializar o sistema
if '--init' in sys.argv:
init_system()
else:
app.run(
host='0.0.0.0',
port=5000,
debug=os.getenv('FLASK_ENV') == 'development'
)
if len(sys.argv) > 1:
print("app.py não aceita argumentos.")
print("Use 'python scripts/manage.py --help' para comandos administrativos.")
raise SystemExit(2)
app.run(
host='0.0.0.0',
port=5000,
debug=os.getenv('FLASK_ENV') == 'development'
)

View File

@@ -1,13 +1,10 @@
from flask import Blueprint, request, render_template, redirect, url_for, flash, session, jsonify
from flask_login import login_user, logout_user, current_user
from datetime import datetime
from functions.database import get_db_connection, Usuario
from functions.database import Militante, get_db_session, Usuario
from functions.decorators import require_login
from werkzeug.security import generate_password_hash
import pyotp
import qrcode
import base64
from io import BytesIO
from services.otp_service import generate_qr_code_base64
auth_bp = Blueprint('auth', __name__)
@@ -30,7 +27,7 @@ def login():
flash("Email/usuário e senha são obrigatórios.", "danger")
return redirect(url_for("auth.login"))
db = get_db_connection()
db = get_db_session()
try:
# Tenta encontrar o usuário por email ou username
user = db.query(Usuario).filter(
@@ -105,7 +102,7 @@ def api_login():
'error': 'Email/username e senha são obrigatórios'
}), 400
db = get_db_connection()
db = get_db_session()
try:
# Buscar usuário
user = db.query(Usuario).filter(
@@ -182,7 +179,7 @@ def api_logout():
"""Endpoint de logout API"""
try:
if current_user.is_authenticated:
db = get_db_connection()
db = get_db_session()
try:
user = current_user
user.logout()
@@ -226,7 +223,7 @@ def api_status():
@auth_bp.route("/logout")
@require_login
def logout():
db = get_db_connection()
db = get_db_session()
try:
user = current_user
if user:
@@ -255,7 +252,7 @@ def alterar_senha():
flash("As senhas não coincidem.", "error")
return redirect(url_for("auth.alterar_senha"))
db = get_db_connection()
db = get_db_session()
try:
user = db.query(Usuario).get(current_user.id)
if not user.check_password(senha_atual):
@@ -274,31 +271,14 @@ def alterar_senha():
@auth_bp.route("/qr/<token>")
def get_qr_code(token):
"""Gera QR code para configuração OTP"""
db = get_db_connection()
db = get_db_session()
try:
militante = db.query(Militante).filter_by(temp_token=token).first()
if not militante or militante.temp_token_expiry < datetime.now():
flash('Token inválido ou expirado.', 'danger')
return redirect(url_for('auth.login'))
qr_code = generate_qr_code(militante)
qr_code = generate_qr_code_base64(militante)
return render_template('mostrar_qr_code.html', qr_code=qr_code)
finally:
db.close()
def generate_qr_code(user):
"""Gera um QR code para o usuário"""
if not user.otp_secret:
user.otp_secret = pyotp.random_base32()
totp = pyotp.TOTP(user.otp_secret)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles"))
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8')
return qr_code

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify
from functions.database import get_db_connection, CotaMensal, Militante
from functions.database import get_db_session, CotaMensal, Militante
from functions.decorators import require_login
from utils.date_utils import validar_data, converter_data
from datetime import datetime
@@ -12,6 +12,7 @@ cota_bp = Blueprint('cota', __name__)
def novo():
"""Cria uma nova cota mensal"""
if request.method == "POST":
db = get_db_session()
try:
militante_id = request.form.get("militante_id")
valor_antigo = float(request.form.get("valor_antigo"))
@@ -23,7 +24,6 @@ def novo():
flash('Data inválida ou futura', 'danger')
return redirect(url_for('cota.novo'))
db = get_db_connection()
cota = CotaMensal(
militante_id=militante_id,
valor_antigo=valor_antigo,
@@ -44,7 +44,7 @@ def novo():
db.close()
# GET - Renderizar formulário
db = get_db_connection()
db = get_db_session()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
return render_template("nova_cota.html", militantes=militantes)
@@ -55,7 +55,7 @@ def novo():
@require_login
def listar():
"""Lista todas as cotas mensais com controle de permissões no nível de dados"""
db = get_db_connection()
db = get_db_session()
try:
# SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões
cotas = []
@@ -82,7 +82,7 @@ def listar():
@require_login
def editar(id):
"""Edita uma cota mensal"""
db = get_db_connection()
db = get_db_session()
try:
cota = db.query(CotaMensal).get(id)
if not cota:
@@ -114,7 +114,7 @@ def editar(id):
@require_login
def excluir(id):
"""Exclui uma cota mensal"""
db = get_db_connection()
db = get_db_session()
try:
cota = db.query(CotaMensal).get(id)
if not cota:

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, render_template, flash, redirect, url_for, jsonify
from functions.database import get_db_connection, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento
from functions.database import get_db_session, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento
from functions.decorators import require_login
from datetime import datetime
from sqlalchemy import func
@@ -28,7 +28,7 @@ def dashboard():
stats = DashboardService.get_dashboard_stats()
# Get tipos de pagamento for the modal
db = get_db_connection()
db = get_db_session()
try:
tipos_pagamento = db.query(TipoPagamento).all()
finally:

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify
from functions.database import get_db_connection, MaterialVendido, Militante, TipoMaterial
from functions.database import get_db_session, MaterialVendido, Militante, TipoMaterial
from functions.decorators import require_login
from utils.date_utils import validar_data, converter_data
from datetime import datetime
@@ -11,7 +11,7 @@ material_bp = Blueprint('material', __name__)
@require_login
def listar():
"""Lista todos os materiais com controle de permissões no nível de dados"""
db = get_db_connection()
db = get_db_session()
try:
# SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões
materiais = []
@@ -46,6 +46,7 @@ def listar():
def novo():
"""Cria um novo material vendido"""
if request.method == "POST":
db = get_db_session()
try:
militante_id = request.form.get("militante_id")
tipo_material_id = request.form.get("tipo_material_id")
@@ -57,7 +58,6 @@ def novo():
flash('Data de venda inválida ou futura', 'danger')
return redirect(url_for('material.novo'))
db = get_db_connection()
material = MaterialVendido(
militante_id=militante_id,
tipo_material_id=tipo_material_id,
@@ -77,7 +77,7 @@ def novo():
db.close()
# GET - Renderizar formulário
db = get_db_connection()
db = get_db_session()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
tipos_material = db.query(TipoMaterial).order_by(TipoMaterial.descricao).all()
@@ -89,7 +89,7 @@ def novo():
@require_login
def editar(id):
"""Edita um material vendido"""
db = get_db_connection()
db = get_db_session()
try:
material = db.query(MaterialVendido).get(id)
if not material:
@@ -122,7 +122,7 @@ def editar(id):
@require_login
def excluir(id):
"""Exclui um material vendido"""
db = get_db_connection()
db = get_db_session()
try:
material = db.query(MaterialVendido).get(id)
if not material:
@@ -146,7 +146,7 @@ def excluir(id):
@require_login
def listar_tipos():
"""Lista todos os tipos de materiais com controle de permissões no nível de dados"""
db = get_db_connection()
db = get_db_session()
try:
# SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões
tipos_materiais = []
@@ -174,10 +174,10 @@ def listar_tipos():
def novo_tipo():
"""Cria um novo tipo de material"""
if request.method == "POST":
db = get_db_session()
try:
descricao = request.form.get("descricao")
db = get_db_connection()
tipo = TipoMaterial(descricao=descricao)
db.add(tipo)
db.commit()
@@ -196,7 +196,7 @@ def novo_tipo():
@require_login
def editar_tipo(id):
"""Edita um tipo de material"""
db = get_db_connection()
db = get_db_session()
try:
tipo = db.query(TipoMaterial).get(id)
if not tipo:
@@ -222,7 +222,7 @@ def editar_tipo(id):
@require_login
def excluir_tipo(id):
"""Exclui um tipo de material"""
db = get_db_connection()
db = get_db_session()
try:
tipo = db.query(TipoMaterial).get(id)
if not tipo:

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify
from functions.database import get_db_connection, Militante, EmailMilitante, Endereco, Celula, Setor, ComiteRegional
from functions.database import get_db_session, Militante, EmailMilitante, Endereco, Celula, Setor, ComiteRegional
from functions.decorators import require_login
from functions.validations import validar_cpf
from functions.rbac import Permission
@@ -14,6 +14,7 @@ militante_bp = Blueprint('militante', __name__)
@require_login
def criar():
"""Cria um novo militante"""
db = get_db_session()
try:
data = request.get_json()
@@ -29,9 +30,7 @@ def criar():
'status': 'error',
'message': 'CPF inválido'
}), 400
db = get_db_connection()
# Verificar se CPF já existe
if db.query(Militante).filter_by(cpf=data['cpf']).first():
return jsonify({
@@ -104,7 +103,7 @@ def criar():
@require_login
def listar():
"""Lista todos os militantes com controle de permissões no nível de dados"""
db = get_db_connection()
db = get_db_session()
try:
# SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões
militantes = []
@@ -182,7 +181,7 @@ def listar():
@require_login
def excluir(id):
"""Exclui um militante"""
db = get_db_connection()
db = get_db_session()
try:
militante = db.query(Militante).get(id)
if not militante:
@@ -211,10 +210,9 @@ def excluir(id):
@require_login
def editar(militante_id):
"""Edita um militante existente"""
db = get_db_session()
try:
data = request.get_json()
db = get_db_connection()
militante = db.query(Militante).get(militante_id)
if not militante:
@@ -283,7 +281,7 @@ def editar(militante_id):
@require_login
def buscar_dados(militante_id):
"""Busca os dados de um militante específico"""
db = get_db_connection()
db = get_db_session()
try:
militante = db.query(Militante).options(
joinedload(Militante.emails),
@@ -359,7 +357,7 @@ def buscar_dados(militante_id):
@require_login
def get_setores(cr_id):
"""Retorna setores de um CR específico"""
db = get_db_connection()
db = get_db_session()
try:
setores = db.query(Setor).filter_by(cr_id=cr_id).all()
return jsonify([{'id': s.id, 'nome': s.nome} for s in setores])

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify
from functions.database import get_db_connection, Pagamento, Militante, TipoPagamento
from functions.database import get_db_session, Pagamento, Militante, TipoPagamento
from functions.decorators import require_login
from utils.date_utils import validar_data, converter_data
from datetime import datetime
@@ -12,6 +12,7 @@ pagamento_bp = Blueprint('pagamento', __name__)
def novo():
"""Cria um novo pagamento"""
if request.method == "POST":
db = get_db_session()
try:
militante_id = request.form.get("militante_id")
tipo_pagamento_id = request.form.get("tipo_pagamento_id")
@@ -22,7 +23,6 @@ def novo():
flash('Data de pagamento inválida ou futura', 'danger')
return redirect(url_for('pagamento.novo'))
db = get_db_connection()
pagamento = Pagamento(
militante_id=militante_id,
tipo_pagamento_id=tipo_pagamento_id,
@@ -41,7 +41,7 @@ def novo():
db.close()
# GET - Renderizar formulário
db = get_db_connection()
db = get_db_session()
try:
militantes = db.query(Militante).order_by(Militante.nome).all()
tipos_pagamento = db.query(TipoPagamento).order_by(TipoPagamento.descricao).all()
@@ -53,7 +53,7 @@ def novo():
@require_login
def listar():
"""Lista todos os pagamentos com controle de permissões no nível de dados"""
db = get_db_connection()
db = get_db_session()
try:
# SEMPRE renderizar o template, mas filtrar os dados baseado nas permissões
pagamentos = []
@@ -88,13 +88,13 @@ def listar():
def adicionar():
"""Adiciona um novo pagamento"""
if request.method == "POST":
db = get_db_session()
try:
militante_id = request.form.get("militante_id")
tipo_pagamento = request.form.get("tipo_pagamento")
valor = float(request.form.get("valor"))
data_pagamento = converter_data(request.form.get("data_pagamento"))
db = get_db_connection()
pagamento = Pagamento(
militante_id=militante_id,
tipo_pagamento=tipo_pagamento,
@@ -116,7 +116,7 @@ def adicionar():
@require_login
def list_pagamentos_celula(celula_id):
"""Lista pagamentos de uma célula específica"""
db = get_db_connection()
db = get_db_session()
try:
pagamentos = db.query(Pagamento).filter_by(celula_id=celula_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
@@ -127,7 +127,7 @@ def list_pagamentos_celula(celula_id):
@require_login
def list_pagamentos_setor(setor_id):
"""Lista pagamentos de um setor específico"""
db = get_db_connection()
db = get_db_session()
try:
pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
@@ -138,7 +138,7 @@ def list_pagamentos_setor(setor_id):
@require_login
def list_pagamentos_cr(cr_id):
"""Lista pagamentos de um CR específico"""
db = get_db_connection()
db = get_db_session()
try:
pagamentos = db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all()
return render_template('pagamentos/list.html', pagamentos=pagamentos)
@@ -150,7 +150,7 @@ def list_pagamentos_cr(cr_id):
def novo_pagamento_celula(celula_id):
"""Cria novo pagamento para uma célula"""
if request.method == 'POST':
db = get_db_connection()
db = get_db_session()
try:
pagamento = Pagamento(
valor=request.form['valor'],
@@ -171,7 +171,7 @@ def novo_pagamento_celula(celula_id):
def novo_pagamento_setor(setor_id):
"""Cria novo pagamento para um setor"""
if request.method == 'POST':
db = get_db_connection()
db = get_db_session()
try:
pagamento = Pagamento(
valor=request.form['valor'],
@@ -192,7 +192,7 @@ def novo_pagamento_setor(setor_id):
def novo_pagamento_cr(cr_id):
"""Cria novo pagamento para um CR"""
if request.method == 'POST':
db = get_db_connection()
db = get_db_session()
try:
pagamento = Pagamento(
valor=request.form['valor'],

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, request, render_template, redirect, url_for, flash, jsonify
from functions.database import get_db_connection, Usuario, Role, Setor
from functions.database import get_db_session, Usuario, Role, Setor
from functions.decorators import require_login
from flask_login import current_user
import pyotp
@@ -18,7 +18,7 @@ def novo():
setor_id = request.form.get("setor_id")
# Verificar se usuário já existe
db = get_db_connection()
db = get_db_session()
try:
if db.query(Usuario).filter_by(username=username).first():
flash('Nome de usuário já existe.', 'danger')
@@ -45,7 +45,7 @@ def novo():
finally:
db.close()
db = get_db_connection()
db = get_db_session()
try:
roles = db.query(Role).order_by(Role.nome).all()
setores = db.query(Setor).order_by(Setor.nome).all()
@@ -63,7 +63,7 @@ def toggle_status(user_id):
'error': 'Você não tem permissão para alterar o status de usuários.'
}), 403
db = get_db_connection()
db = get_db_session()
try:
usuario = db.query(Usuario).get(user_id)
if not usuario:
@@ -105,7 +105,7 @@ def alterar_nivel(user_id):
'error': 'Novo nível não especificado.'
}), 400
db = get_db_connection()
db = get_db_session()
try:
usuario = db.query(Usuario).get(user_id)
if not usuario:
@@ -150,7 +150,7 @@ def toggle_quadro_orientador(user_id):
'error': 'Você não tem permissão para alterar responsabilidades de usuários.'
}), 403
db = get_db_connection()
db = get_db_session()
try:
usuario = db.query(Usuario).get(user_id)
if not usuario:

View File

@@ -1,5 +0,0 @@
# Netscape HTTP Cookie File
# https://curl.se/docs/http-cookies.html
# This file was generated by libcurl! Edit at your own risk.
#HttpOnly_localhost FALSE / FALSE 0 session .eJw9jjsOgzAUBO_iOoWfP9jmMuh91goFFBCqKHePJZR0u5op5u2WfuB8uvl1XHi4ZTU3OyTNpimqxySBhYIVTqa5VSqxdVWLnRoIOXVvk3ijGFS5gnrTVOEDvPUIIkFEz8VIpNZUWKmVwAryYqTFwwJbVKmD-NzEuIkbIdeJ466hcddzYdvW_df5p3TvnTcM9XY-XwnBQXY.aGPhDw.BUcsxy5unEUB2pJjMnJy9ITNKXs

View File

@@ -1,171 +0,0 @@
from functions.database import init_database, Usuario, Role, get_db_connection
import qrcode
import os
from pathlib import Path
import pyotp
def generate_qr_code(user):
"""
Gera o QR code para um usuário específico
Args:
user: Instância do modelo Usuario
Returns:
tuple: (caminho do arquivo, URI do OTP)
"""
# Tentar diferentes caminhos para salvar o QR code
qr_paths = [
Path('/tmp/admin_qr.png'), # Diretório temporário do sistema
Path('admin_qr.png'), # Diretório atual
Path('/app/admin_qr.png') # Diretório da aplicação
]
# Gerar e salvar QR Code
qr = qrcode.QRCode(version=1, box_size=10, border=5)
# Gerar URI do OTP
totp = pyotp.TOTP(user.otp_secret)
otp_uri = totp.provisioning_uri(
name=user.username,
issuer_name="Sistema de Controles"
)
qr.add_data(otp_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Tentar salvar em diferentes locais
qr_saved = False
saved_path = None
for qr_path in qr_paths:
try:
# Tentar salvar o arquivo
img.save(str(qr_path))
print(f"QR Code salvo em: {qr_path}")
qr_saved = True
saved_path = qr_path
break
except Exception as e:
print(f"Não foi possível salvar o QR code em {qr_path}: {e}")
continue
if not qr_saved:
print("AVISO: Não foi possível salvar o QR code em nenhum local")
print("O QR code pode ser gerado manualmente usando o URI OTP")
saved_path = None
return saved_path, otp_uri
def create_admin_user():
"""Cria ou atualiza o usuário admin"""
try:
# Inicializar banco de dados
init_database()
# Criar sessão
db = get_db_connection()
try:
# Verificar se já existe um usuário admin
admin = db.query(Usuario).filter_by(username="admin").first()
if admin:
print("\n=== Usuário Admin Encontrado ===")
if not admin.otp_secret:
print("Gerando novo segredo OTP...")
admin.generate_otp_secret()
db.commit()
else:
print("\n=== Criando Novo Usuário Admin ===")
# Criar novo usuário admin
admin = Usuario(
username="admin",
email="admin@example.com",
is_admin=True
)
admin.set_password("admin123")
admin.generate_otp_secret()
# Buscar ou criar role de admin
admin_role = db.query(Role).filter_by(nome="admin").first()
if not admin_role:
admin_role = Role(nome="admin", nivel=0) # Nível 0 é o mais alto
db.add(admin_role)
# Adicionar role ao usuário
admin.roles.append(admin_role)
# Adicionar e fazer commit
db.add(admin)
db.commit()
# Gerar QR code
qr_path, otp_uri = generate_qr_code(admin)
if qr_path:
print("\n=== QR Code Gerado ===")
print(f"QR Code salvo em: {qr_path}")
print(f"URI do OTP: {otp_uri}")
else:
print("\n=== QR Code Não Pode Ser Salvo ===")
print("Use o URI OTP para configuração manual:")
print(f"URI do OTP: {otp_uri}")
# Mostrar informações
print("\n=== Informações do Admin ===")
print(f"Username: {admin.username}")
print(f"Email: {admin.email}")
print(f"Senha: admin123")
print(f"Segredo OTP: {admin.otp_secret}")
# Gerar código atual para verificação
totp = pyotp.TOTP(admin.otp_secret)
current_code = totp.now()
print("\n=== Verificação do OTP ===")
print(f"Código OTP atual: {current_code}")
print(f"Verificação do código: {totp.verify(current_code)}")
print("\n=== Instruções para Configuração ===")
print("1. Instale um aplicativo autenticador no seu celular")
print(" (Google Authenticator, Microsoft Authenticator, etc)")
print("2. Abra o aplicativo")
print("3. Selecione a opção para adicionar uma nova conta")
if qr_path:
print("4. Escaneie o QR Code salvo em:", qr_path)
print("\nOU configure manualmente:")
print(f"- Nome da conta: {admin.username}")
print(f"- Segredo: {admin.otp_secret}")
print("- Tipo: Baseado em tempo (TOTP)")
print("- Algoritmo: SHA1")
print("- Dígitos: 6")
print("- Intervalo: 30 segundos")
# Verificação final
print("\n=== Teste de Verificação ===")
test_code = totp.now()
print(f"Código de teste: {test_code}")
is_valid = admin.verify_otp(test_code)
print(f"Verificação do código: {'Sucesso' if is_valid else 'Falha'}")
if not is_valid:
print("\nALERTA: Verificação do OTP falhou!")
print("Por favor, verifique se o segredo OTP está correto.")
# Fazer commit final para garantir que tudo foi salvo
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
db.close()
except Exception as e:
print(f"\nErro durante a execução: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
create_admin_user()

View File

@@ -0,0 +1,54 @@
services:
# Redis Cache Service
redis:
image: redis:7-alpine
container_name: controles_redis
volumes:
- redis_data:/data
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
restart: unless-stopped
healthcheck:
test: [ "CMD", "redis-cli", "ping" ]
interval: 30s
timeout: 10s
retries: 3
networks:
- controles_network
# Flask Application
app:
build: .
container_name: controles_app
ports:
- "5000:5000"
environment:
- FLASK_APP=app.py
- FLASK_ENV=production
- REDIS_URL=redis://redis:6379/0
- DATABASE_URL=sqlite:////data/database.db
# DEV apenas para facilitar testes, não deve ser usado em produção
- ADMIN_OTP_SECRET=JBSWY3DPEHPK3PXP
# Produção definir em .env
#- ADMIN_OTP_SECRET=${ADMIN_OTP_SECRET}
volumes:
- app_data:/data
- app_logs:/app/logs
read_only: true
tmpfs:
- /tmp
security_opt:
- no-new-privileges:true
cap_drop:
- ALL
depends_on:
redis:
condition: service_healthy
restart: unless-stopped
networks:
- controles_network
networks:
controles_network:
driver: bridge

View File

@@ -1,18 +1,14 @@
version: '3.8'
services:
# Redis Cache Service
redis:
image: redis:7-alpine
container_name: controles_redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
test: [ "CMD", "redis-cli", "ping" ]
interval: 30s
timeout: 10s
retries: 3
@@ -21,7 +17,11 @@ services:
# Flask Application
app:
build: .
build:
context: .
args:
APP_UID: ${APP_UID:-1000}
APP_GID: ${APP_GID:-1000}
container_name: controles_app
ports:
- "5000:5000"
@@ -29,11 +29,23 @@ services:
- FLASK_APP=app.py
- FLASK_ENV=production
- REDIS_URL=redis://redis:6379/0
- DATABASE_URL=sqlite:///app/database.db
- DATABASE_URL=sqlite:////data/database.db
# DEV apenas para facilitar testes, não deve ser usado em produção
- ADMIN_OTP_SECRET=JBSWY3DPEHPK3PXP
# Produção definir em .env
#- ADMIN_OTP_SECRET=${ADMIN_OTP_SECRET}
volumes:
- ./database.db:/app/database.db
- ./admin_qr.png:/app/admin_qr.png
- ./data:/data
- ./logs:/app/logs
read_only: true
tmpfs:
- /tmp
security_opt:
- no-new-privileges:true
cap_drop:
- ALL
depends_on:
redis:
condition: service_healthy
@@ -47,4 +59,4 @@ volumes:
networks:
controles_network:
driver: bridge
driver: bridge

View File

@@ -2,26 +2,41 @@
Sistema de gerenciamento para a Organização Comunista Internacionalista (OCI) com controle de militantes, cotas, pagamentos e materiais.
## 🚀 Status Atual
## Trilha Recomendada de Leitura
**Sistema com Arquitetura de Permissões Corrigida**
- Aplicação Flask rodando com Docker
- Redis cache integrado e funcionando
- Banco de dados SQLite inicializado
- Usuário admin configurado com OTP
- 30 militantes de teste criados
- Estrutura organizacional completa
- **Sistema de permissões implementado no nível de dados**
- **Menus sempre visíveis, controle transparente**
1. `docs/architecture_summary.md`
2. `docs/rbac.md`
3. `docs/permission_strategy.md`
4. `docs/redis_cache_setup.md`
5. `docs/permission_fixes_summary.md`
## 🎯 Arquitetura de Permissões
## Índice por Tema
O sistema implementa uma estratégia de controle de permissões no **nível de dados**, garantindo que:
### Arquitetura
- **Menus permanecem sempre visíveis** - Não há restrições na interface
- **Dados são filtrados por hierarquia** - Admin → CC → CR → Setor → Célula
- **Templates nunca quebram** - Sempre renderizam, mesmo com dados vazios
- **Tesoureiros têm poder adequado** - Podem fazer tudo que secretários fazem
- `docs/architecture_summary.md`: visão geral do estado da arquitetura.
- `docs/mvc_refactoring.md`: detalhes da refatoração MVC.
### Permissões e Segurança de Acesso
- `docs/rbac.md`: níveis de papel e herança de permissões.
- `docs/permission_strategy.md`: estratégia de filtragem de dados e uso em templates.
- `docs/permission_fixes_summary.md`: resumo das correções aplicadas em permissões.
### Infra e Performance
- `docs/redis_cache_setup.md`: configuração e uso de cache Redis.
### Histórico Técnico
- `docs/alteracoes_db_connection.md`: alterações no gerenciamento de conexão/sessão de banco.
## Como Manter Esta Pasta Organizada
- Preferir um arquivo por assunto (evitar documentos muito amplos).
- Começar cada documento com contexto, problema e decisão.
- Registrar trade-offs e impactos de manutenção.
- Atualizar este índice sempre que um novo documento for criado.
### Diagrama da Arquitetura
@@ -46,87 +61,6 @@ graph TD
J --> K[Always Renders Successfully]
```
## 🏗️ Arquitetura
O sistema foi refatorado seguindo o padrão MVC (Model-View-Controller):
```
controles/
├── app.py # Ponto de entrada da aplicação
├── controllers/ # Controladores (lógica de rotas)
├── models/ # Modelos (operações de banco)
├── services/ # Serviços (lógica de negócio)
├── templates/ # Views (templates HTML)
├── static/ # Assets estáticos
└── functions/ # Funções utilitárias
```
## 🐳 Docker Setup
### Pré-requisitos
- Docker e Docker Compose instalados
- Porta 5000 disponível para a aplicação
- Porta 6379 disponível para Redis
### Inicialização Rápida
```bash
# Clonar o repositório
git clone <repository-url>
cd controles
# Iniciar o ambiente completo
make dev-up
# Verificar status
docker-compose ps
# Ver logs
make docker-logs
```
### Comandos Úteis
```bash
# Iniciar serviços
make dev-up
# Parar serviços
make dev-down
# Ver logs
make docker-logs
# Status do cache Redis
make cache-status
# Limpar cache
make cache-clear
# Reconstruir containers
make docker-build
```
## 🔐 Acesso ao Sistema
### Credenciais do Admin
- **URL**: http://localhost:5000
- **Usuário**: admin
- **Senha**: admin123
- **OTP Secret**: JBSWY3DPEHPK3PXP
### Configuração OTP
1. Instale um aplicativo autenticador (Google Authenticator, Microsoft Authenticator)
2. Configure manualmente:
- Nome: admin
- Segredo: JBSWY3DPEHPK3PXP
- Tipo: TOTP
- Algoritmo: SHA1
- Dígitos: 6
- Intervalo: 30 segundos
**OU** use o QR Code gerado em `/tmp/admin_qr.png` dentro do container.
## 📊 Funcionalidades
### Gestão de Militantes
@@ -152,80 +86,6 @@ make docker-build
- Relatórios de vendas
- Relatórios de pagamentos
## 🗄️ Banco de Dados
### Estrutura
- **SQLite** com SQLAlchemy ORM
- **Redis** para cache de performance
- Migrações automáticas
- Dados de teste incluídos
### Inicialização
O banco é inicializado automaticamente no primeiro startup com:
- 30 militantes de teste
- Estrutura organizacional completa
- Tipos de pagamento e materiais
- Usuário admin configurado
## 🔧 Tecnologias
- **Backend**: Flask 2.3.3
- **Frontend**: Bootstrap 5, HTML5, CSS3, JavaScript
- **Database**: SQLite + SQLAlchemy 2.0.21
- **Cache**: Redis 7.4.4
- **Authentication**: Flask-Login + OTP (pyotp)
- **Container**: Docker + Docker Compose
- **Server**: Gunicorn
## 📁 Estrutura de Arquivos
```
controles/
├── app.py # Aplicação principal
├── controllers/ # Controladores MVC
│ ├── auth_controller.py # Autenticação
│ ├── home_controller.py # Dashboard
│ ├── militante_controller.py # Militantes
│ ├── pagamento_controller.py # Pagamentos
│ ├── cota_controller.py # Cotas
│ └── usuario_controller.py # Usuários
├── models/ # Modelos de dados
├── services/ # Serviços de negócio
├── templates/ # Templates HTML
├── static/ # Assets estáticos
├── functions/ # Funções utilitárias
├── docs/ # Documentação
├── docker-compose.yml # Configuração Docker
├── Dockerfile # Imagem Docker
└── requirements.txt # Dependências Python
```
## 🚨 Problemas Resolvidos
### ✅ QR Code Admin
- **Problema**: Erro de permissão ao salvar QR code
- **Solução**: Múltiplos caminhos de fallback, salvamento em `/tmp/`
### ✅ Conexão Redis
- **Problema**: Falhas de conexão durante startup
- **Solução**: Retry logic com backoff exponencial
### ✅ Método OTP
- **Problema**: Método `generate_otp_secret` ausente
- **Solução**: Implementado na classe Usuario
### ✅ Rede Docker
- **Problema**: Serviços não se comunicavam
- **Solução**: Configuração explícita de redes
### ✅ Segredo OTP Inválido
- **Problema**: Segredo OTP não estava em formato base32 válido
- **Solução**: Alterado para `JBSWY3DPEHPK3PXP` (formato base32 válido)
### ✅ Verificação de Arquivo QR Code
- **Problema**: `PermissionError` ao verificar existência do arquivo
- **Solução**: Removida verificação de existência, implementado sistema de fallback
## 📈 Performance
### Cache Redis
@@ -235,6 +95,7 @@ controles/
- API responses: Variável
### Monitoramento
```bash
# Status do cache
make cache-status
@@ -246,68 +107,6 @@ make docker-logs
docker-compose logs redis
```
## 🔍 Troubleshooting
### Problemas Comuns
1. **Redis não conecta**
```bash
docker-compose logs redis
docker-compose restart redis
```
2. **Aplicação não inicia**
```bash
docker-compose logs app
docker-compose down && docker-compose up -d
```
3. **Cache não funciona**
```bash
make cache-status
make cache-clear
```
4. **Erro de OTP**
```bash
# Verificar se o segredo está correto
echo "JBSWY3DPEHPK3PXP" | base32 -d
```
5. **Erro de permissão QR Code**
```bash
# O QR code agora salva em /tmp/admin_qr.png
# Se não conseguir salvar, use configuração manual
```
### Logs
- Aplicação: `logs/controles.log`
- Cache: `logs/cache.log`
- Docker: `docker-compose logs`
## 📚 Documentação
- [Arquitetura MVC](docs/mvc_refactoring.md)
- [Sistema RBAC](docs/rbac.md)
- [Cache Redis](docs/redis_cache_setup.md)
- [Resumo da Arquitetura](docs/architecture_summary.md)
## 🤝 Contribuição
1. Fork o projeto
2. Crie uma branch para sua feature
3. Commit suas mudanças
4. Push para a branch
5. Abra um Pull Request
## 📄 Licença
Este projeto é privado para uso da OCI.
## 📞 Suporte
Para suporte técnico, entre em contato com a equipe de desenvolvimento.
## 📋 Recommended Next Steps
### High Priority

View File

@@ -1,17 +1,17 @@
from sqlalchemy import create_engine
import os
from sqlalchemy import create_engine, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pathlib import Path
import os
# Configurar caminho do banco de dados
db_dir = Path.home() / '.local' / 'share' / 'controles'
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / 'database.db'
db_path = Path(__file__).resolve().parents[1] / 'data' / 'database.db'
db_path.parent.mkdir(parents=True, exist_ok=True)
db_fallback = f'sqlite:///{db_path}'
# Configurar SQLite com opções para melhor concorrência
engine = create_engine(
f'sqlite:///{db_path}',
os.environ.get('DATABASE_URL', db_fallback),
connect_args={
'timeout': 30, # Tempo de espera em segundos
'check_same_thread': False # Permite acesso de múltiplas threads
@@ -23,11 +23,15 @@ engine = create_engine(
Session = sessionmaker(bind=engine)
Base = declarative_base()
def get_db_connection():
"""Retorna uma nova sessão do banco de dados"""
session = Session()
def get_db_session():
"""Retorna uma nova sessão do banco de dados com PRAGMAs configuradas"""
db_session = Session()
try:
return session
# Configurar SQLite para melhor tratamento de concorrência
db_session.execute(text("PRAGMA journal_mode=WAL"))
db_session.execute(text("PRAGMA busy_timeout=5000"))
return db_session
except Exception as e:
session.rollback()
raise e
db_session.rollback()
db_session.close()
raise e

View File

@@ -1,10 +1,10 @@
from datetime import datetime, UTC
from sqlalchemy.exc import SQLAlchemyError
from functions.database import get_db_connection, Controle as ControleModel
from functions.database import get_db_session, Controle as ControleModel
class Controle:
def __init__(self):
self.db = get_db_connection()
self.db = get_db_session()
def registrar_controle(self, militante_id: int, tipo: str, valor: float, observacao: str = None) -> bool:
"""

View File

@@ -1,57 +1,31 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, Numeric, Date, Enum, create_engine, text
from sqlalchemy.orm import sessionmaker, relationship, backref
import os
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, Numeric, Date, Enum
from sqlalchemy.orm import relationship, backref
import pyotp
from pathlib import Path
from sqlalchemy.pool import NullPool
import secrets
from flask_mail import Message
from flask import url_for
import enum
from flask_login import UserMixin
from .rbac import Role, Permission, role_permissions, user_roles
from .base import Base, engine, Session
import logging
from .rbac import Role
from .base import Base, get_db_session
# Configurar caminho do banco de dados
db_dir = Path.home() / '.local' / 'share' / 'controles'
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / 'database.db'
DATABASE_URL = f"sqlite:///{db_path}"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db_connection():
"""Retorna uma nova sessão do banco de dados"""
Session = sessionmaker(bind=engine)
db = Session()
try:
# Configurar SQLite para melhor tratamento de concorrência
db.execute(text("PRAGMA journal_mode=WAL"))
db.execute(text("PRAGMA busy_timeout=5000"))
return db
except:
db.close()
raise
def execute_query(query, params=None):
"""
Executa uma query usando SQLAlchemy
"""
session = get_db_connection()
db = get_db_session()
try:
result = session.execute(query, params)
session.commit()
result = db.execute(query, params)
db.commit()
return result
except Exception as e:
session.rollback()
db.rollback()
raise e
finally:
session.close()
db.close()
class EstadoMilitante(enum.Enum):
ATIVO = 'ativo'
@@ -174,7 +148,7 @@ class Militante(Base):
quadro_orientador = Column(Boolean, default=False)
# Campos para Aspirante
aspirante = Column(Boolean, default=True) # Por padrão, todo novo militante é aspirante
data_inicio_aspirante = Column(DateTime, default=datetime.utcnow)
data_inicio_aspirante = Column(DateTime, default=datetime.now(UTC))
avaliacao_aspirante = Column(Text)
data_avaliacao_aspirante = Column(DateTime)
@@ -277,7 +251,7 @@ class Militante(Base):
def generate_username(self):
"""Gera um nome de usuário único baseado no primeiro nome e um código"""
from sqlalchemy import func
db = get_db_connection()
db = get_db_session()
try:
# Pega o primeiro nome
primeiro_nome = self.nome.split()[0].lower()
@@ -454,7 +428,7 @@ class Usuario(Base, UserMixin):
celula_id = Column(Integer, ForeignKey('celulas.id'))
session_timeout = Column(Integer, default=30)
tipo = Column(String(17), nullable=False)
ultima_atividade = Column(DateTime, default=datetime.utcnow)
ultima_atividade = Column(DateTime, default=datetime.now(UTC))
# Relacionamento com militante
militante_id = Column(Integer, ForeignKey('militantes.id'))
militante = relationship("Militante", backref=backref("usuario", uselist=False))
@@ -473,7 +447,7 @@ class Usuario(Base, UserMixin):
self.ativo = True
self.session_timeout = 30
self.tipo = "USUARIO"
self.ultima_atividade = datetime.utcnow()
self.ultima_atividade = datetime.now(UTC)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
@@ -482,23 +456,24 @@ class Usuario(Base, UserMixin):
return check_password_hash(self.password_hash, password)
def update_last_activity(self):
self.ultima_atividade = datetime.utcnow()
self.ultima_atividade = datetime.now(UTC)
def is_session_expired(self):
if not self.ultima_atividade:
return True
time_diff = datetime.utcnow() - self.ultima_atividade
time_diff = datetime.now(UTC) - self.ultima_atividade
return time_diff.total_seconds() > (self.session_timeout * 60)
def check_session_timeout(self):
"""Verifica se a sessão do usuário expirou"""
if not self.ultima_atividade:
return True
time_diff = datetime.utcnow() - self.ultima_atividade
time_diff = datetime.now(UTC) - self.ultima_atividade
return time_diff.total_seconds() > (self.session_timeout * 60)
def has_permission(self, permission_name):
"""Verifica se o usuário tem uma permissão específica"""
# TODO: (talvez) remover, confirmar admin por RBAC
if self.is_admin: # Se for admin, tem todas as permissões
return True
@@ -510,54 +485,66 @@ class Usuario(Base, UserMixin):
return False
def has_role(self, role_nivel):
"""Verifica se o usuário tem um determinado nível de role"""
"""Verifica se o usuário tem um nível de role específico."""
for role in self.roles:
if role.nivel == role_nivel:
return True
return False
def get_otp_uri(self):
"""Gera a URI para autenticação em duas etapas"""
if not self.otp_secret:
self.otp_secret = pyotp.random_base32()
return pyotp.totp.TOTP(self.otp_secret).provisioning_uri(
self.username,
issuer_name="Sistema de Controles"
)
def get_highest_role(self):
"""Retorna a role de maior nível do usuário."""
if not self.roles:
return None
return max(self.roles, key=lambda role: role.nivel)
def has_minimum_role(self, min_level):
"""Verifica se o usuário possui ao menos o nível informado."""
highest_role = self.get_highest_role()
return bool(highest_role and highest_role.nivel >= min_level)
def generate_otp_secret(self):
"""Gera um novo segredo OTP para o usuário"""
self.otp_secret = pyotp.random_base32()
return self.otp_secret
def get_otp_uri(self):
"""Gera a URI para autenticação em duas etapas"""
if not self.otp_secret:
raise ValueError(f"OTP não configurado para {self.username}")
totp = pyotp.TOTP(self.otp_secret)
return totp.provisioning_uri(
name=self.username,
issuer_name="Sistema de Controles"
)
def verify_otp(self, code):
"""Verifica se um código OTP é válido"""
if not self.otp_secret:
print(f"Erro: OTP secret não configurado para o usuário {self.username}")
return False
raise ValueError(f"Erro: OTP secret não configurado para o usuário {self.username}")
print(f"Verificando OTP para usuário {self.username}")
print(f"OTP Secret: {self.otp_secret}")
print(f"Código fornecido: {code}")
totp = pyotp.totp.TOTP(self.otp_secret)
totp = pyotp.TOTP(self.otp_secret)
is_valid = totp.verify(code)
print(f"Resultado da verificação: {'Válido' if is_valid else 'Inválido'}")
print(f"Tempo atual: {datetime.utcnow()}")
print(f"Período atual: {totp.timecode(datetime.utcnow())}")
print(f"Tempo atual: {datetime.now(UTC)}")
print(f"Período atual: {totp.timecode(datetime.now(UTC))}")
return is_valid
def logout(self):
"""Registra o logout do usuário"""
self.ultimo_logout = datetime.utcnow()
self.ultimo_logout = datetime.now(UTC)
self.motivo_logout = "Logout manual"
self.ultima_atividade = None
def is_admin_user(self):
"""Verifica se o usuário é admin"""
return self.is_admin or any(role.nome == "admin" for role in self.roles)
return self.is_admin or any(role.nivel == Role.SECRETARIO_GERAL for role in self.roles)
class PagamentoCelula(Base):
__tablename__ = 'pagamentos_celula'
@@ -630,116 +617,3 @@ class TransacaoPIX(Base):
pagamento_id = Column(Integer, ForeignKey('pagamentos.id'))
pagamento = relationship("Pagamento", back_populates="transacoes_pix")
def init_database():
"""Inicializa o banco de dados com dados básicos"""
print("Inicializando banco de dados...")
session = get_db_connection()
try:
# Criar todas as tabelas
Base.metadata.drop_all(engine) # Remover todas as tabelas existentes
Base.metadata.create_all(engine)
# Criar roles padrão
roles = [
("Administrador", Role.SECRETARIO_GERAL),
("Secretário", Role.SECRETARIO_CELULA),
("Militante", Role.MILITANTE_BASICO)
]
for nome, nivel in roles:
if not session.query(Role).filter_by(nome=nome).first():
role = Role(nome=nome, nivel=nivel)
session.add(role)
session.commit()
# Criar setores padrão
setores = ["Setor 1", "Setor 2", "Setor 3"]
for nome in setores:
if not session.query(Setor).filter_by(nome=nome).first():
setor = Setor(nome=nome)
session.add(setor)
session.commit()
# Criar comitês padrão
comites = ["Comitê 1", "Comitê 2", "Comitê 3"]
for nome in comites:
if not session.query(ComiteCentral).filter_by(nome=nome).first():
comite = ComiteCentral(nome=nome)
session.add(comite)
session.commit()
# Gerar OTP para admin
admin_otp_secret = os.environ.get('ADMIN_OTP_SECRET') or pyotp.random_base32()
print(f"OTP do admin: {admin_otp_secret}")
# Criar usuário admin
admin_role = session.query(Role).filter_by(nome="Administrador").first()
setor = session.query(Setor).first()
admin = Usuario(
username="admin",
email="admin@example.com",
is_admin=True
)
admin.set_password("admin123")
admin.tipo = "ADMIN"
admin.otp_secret = admin_otp_secret
admin.roles.append(admin_role)
admin.setor = setor
session.add(admin)
session.commit()
# Gerar QR code
totp = pyotp.totp.TOTP(admin_otp_secret)
provisioning_uri = totp.provisioning_uri("admin", issuer_name="Sistema de Controles")
import qrcode
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Tentar salvar em diferentes locais
qr_paths = ['/tmp/admin_qr.png', 'admin_qr.png', '/app/admin_qr.png']
qr_saved = False
for qr_path in qr_paths:
try:
img.save(qr_path)
print(f"QR code salvo em {qr_path}")
qr_saved = True
break
except Exception as e:
print(f"Não foi possível salvar o QR code em {qr_path}: {e}")
continue
if not qr_saved:
print("AVISO: Não foi possível salvar o QR code em nenhum local")
print("O QR code pode ser gerado manualmente usando o URI OTP")
print("=== Usuário Admin Criado ===")
print(f"Username: admin")
print(f"Senha: admin123")
print(f"Email: {admin.email}")
print(f"OTP Secret: {admin_otp_secret}")
if qr_saved:
print(f"QR Code: {qr_path}")
print(f"URI OTP: {provisioning_uri}")
# Importar e executar o seed após criar todas as dependências
from seed_data import seed_database
print("\nPopulando banco de dados com dados de teste...")
seed_database()
print("Dados de teste criados com sucesso!")
except Exception as e:
print(f"Erro na inicialização do banco: {e}")
session.rollback()
raise
finally:
session.close()
if __name__ == "__main__":
init_database()

View File

@@ -2,7 +2,7 @@ from functools import wraps
from flask import session, redirect, url_for, flash
from flask_login import current_user, login_required
from sqlalchemy.orm import joinedload
from .database import get_db_connection, Usuario, Role
from .database import get_db_session, Usuario, Role
from .rbac import Permission
def require_login(f):
@@ -26,7 +26,7 @@ def require_permission(permission_name):
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
db = get_db_connection()
db = get_db_session()
try:
# Carregar o usuário com suas roles e permissões
user = db.query(Usuario).options(
@@ -58,8 +58,11 @@ def require_permission(permission_name):
return decorated_function
return decorator
def require_role(role_name):
def require_role(role_level):
"""Decorador para verificar se o usuário tem um papel específico"""
if not isinstance(role_level, int):
raise TypeError("require_role espera um nível numérico (int), use a classe Role.")
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
@@ -67,10 +70,10 @@ def require_role(role_name):
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
db = get_db_connection()
db = get_db_session()
try:
user = db.query(Usuario).get(current_user.id)
if not user or not user.has_role(role_name):
if not user or not user.has_role(role_level):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
@@ -86,6 +89,9 @@ def require_role(role_name):
def require_minimum_role(min_level):
"""Decorador para verificar se o usuário tem um papel com nível mínimo"""
if not isinstance(min_level, int):
raise TypeError("require_minimum_role espera um nível numérico de role (int).")
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
@@ -93,15 +99,14 @@ def require_minimum_role(min_level):
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
db = get_db_connection()
db = get_db_session()
try:
user = db.query(Usuario).get(current_user.id)
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('auth.login'))
highest_role = user.get_highest_role()
if not highest_role or highest_role.nivel < min_level:
if not user.has_minimum_role(min_level):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
@@ -146,31 +151,42 @@ def require_instance_access(instance_type, instance_id):
if not current_user.is_authenticated:
flash('Por favor, faça login para acessar esta página.', 'error')
return redirect(url_for('auth.login'))
# Verificar acesso baseado na instância do usuário
if instance_type == 'celula':
if not (current_user.celula_id == instance_id or
current_user.has_permission(Permission.VIEW_SECTOR_REPORTS) or
current_user.has_permission(Permission.VIEW_CR_REPORTS) or
current_user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a esta célula.', 'error')
return redirect(url_for('index'))
elif instance_type == 'setor':
if not (current_user.setor_id == instance_id or
current_user.has_permission(Permission.VIEW_CR_REPORTS) or
current_user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este setor.', 'error')
return redirect(url_for('index'))
elif instance_type == 'cr':
if not (current_user.cr_id == instance_id or
current_user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este CR.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
current_user.update_last_activity()
db_session.commit()
return f(*args, **kwargs)
db = get_db_session()
try:
user = db.query(Usuario).options(
joinedload(Usuario.roles).joinedload(Role.permissions)
).get(current_user.id)
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('auth.login'))
# Verificar acesso baseado na instância do usuário
if instance_type == 'celula':
if not (user.celula_id == instance_id or
user.has_permission(Permission.VIEW_SECTOR_REPORTS) or
user.has_permission(Permission.VIEW_CR_REPORTS) or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a esta célula.', 'error')
return redirect(url_for('index'))
elif instance_type == 'setor':
if not (user.setor_id == instance_id or
user.has_permission(Permission.VIEW_CR_REPORTS) or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este setor.', 'error')
return redirect(url_for('index'))
elif instance_type == 'cr':
if not (user.cr_id == instance_id or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este CR.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator
return decorator

View File

@@ -133,183 +133,183 @@ class Permission(Base):
def init_rbac():
"""Inicializa o sistema RBAC com roles e permissões básicas"""
from .database import Usuario, get_db_connection
session = get_db_connection()
from .database import Usuario, get_db_session
db = get_db_session()
try:
# Criar role de administrador primeiro
admin_role = session.query(Role).filter_by(nome="Administrador").first()
admin_role = db.query(Role).filter_by(nome="Administrador").first()
if not admin_role:
admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL)
session.add(admin_role)
session.commit()
db.add(admin_role)
db.commit()
# Criar outras roles
for nivel, nome in Role.get_roles_list():
if nome != "Administrador": # Pular Administrador pois já foi criado
role = session.query(Role).filter_by(nivel=nivel).first()
role = db.query(Role).filter_by(nivel=nivel).first()
if not role:
role = Role(nome=nome, nivel=nivel)
session.add(role)
db.add(role)
# Criar permissões
for nome, descricao in Permission.get_permissions_list():
permission = session.query(Permission).filter_by(nome=nome).first()
permission = db.query(Permission).filter_by(nome=nome).first()
if not permission:
permission = Permission(nome=nome, descricao=descricao)
session.add(permission)
db.add(permission)
session.commit()
db.commit()
# Dar todas as permissões para o admin
all_permissions = session.query(Permission).all()
all_permissions = db.query(Permission).all()
admin_role.permissions = all_permissions
session.commit()
db.commit()
# Buscar usuário admin e atribuir role de administrador
admin_user = session.query(Usuario).filter_by(username="admin").first()
admin_user = db.query(Usuario).filter_by(username="admin").first()
if admin_user:
if admin_role not in admin_user.roles:
admin_user.roles = [admin_role] # Substituir roles existentes
session.commit()
db.commit()
# Mapear permissões para outros roles
for role in session.query(Role).filter(Role.nome != "Administrador").all():
for role in db.query(Role).filter(Role.nome != "Administrador").all():
# Militante Básico
if role.nivel == Role.MILITANTE_BASICO:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first()
db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first()
]
# Secretário de Célula
elif role.nivel == Role.SECRETARIO_CELULA:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CELL_PAYMENT).first()
db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.REGISTER_CELL_PAYMENT).first()
]
# Membro de Setor
elif role.nivel == Role.MEMBRO_SETOR:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first()
db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first()
]
# Secretário de Setor
elif role.nivel == Role.SECRETARIO_SETOR:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first()
db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
db.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first()
]
# Membro de CR
elif role.nivel == Role.MEMBRO_CR:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first()
db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first()
]
# Secretário de CR
elif role.nivel == Role.SECRETARIO_CR:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first()
db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
db.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first()
]
# Membro do CC
elif role.nivel == Role.MEMBRO_CC:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first()
db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first()
]
# Secretário Geral
elif role.nivel == Role.SECRETARIO_GERAL:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CC_CRS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CC_CR).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first(),
session.query(Permission).filter_by(nome=Permission.SYSTEM_CONFIG).first()
db.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CELL_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
db.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(),
db.query(Permission).filter_by(nome=Permission.MANAGE_CC_CRS).first(),
db.query(Permission).filter_by(nome=Permission.CREATE_CC_CR).first(),
db.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first(),
db.query(Permission).filter_by(nome=Permission.SYSTEM_CONFIG).first()
]
session.commit()
db.commit()
except Exception as e:
print(f"Erro ao inicializar RBAC: {e}")
session.rollback()
db.rollback()
raise
finally:
session.close()
db.close()

View File

@@ -1,19 +0,0 @@
from functions.database import init_database
from functions.rbac import init_rbac
from create_admin import create_admin_user
from create_test_users import create_test_users
def init_system():
print("Inicializando banco de dados...")
init_database()
print("Inicializando sistema RBAC...")
init_rbac()
print("Criando usuários iniciais...")
create_admin_user()
create_test_users()
if __name__ == "__main__":
init_system()
print("Sistema inicializado com sucesso!")

View File

@@ -1,58 +0,0 @@
from create_admin import create_admin
from create_test_users import create_test_users
from functions.database import get_db_connection, Usuario
from functions.rbac import Role
def init_system():
print("=== Inicializando Sistema ===")
# Criar admin
print("\nCriando usuário admin...")
create_admin()
# Criar usuários de teste
print("\nCriando usuários de teste...")
create_test_users()
# Verificar configuração
print("\n=== Verificando Configuração ===")
session = get_db_connection()
try:
# Verificar admin
admin = session.query(Usuario).filter_by(username='admin').first()
if admin:
print("Admin: OK")
print(f"OTP configurado: {'Sim' if admin.otp_secret else 'Não'}")
else:
print("Admin: FALHOU")
# Verificar usuários de teste
test_users = ['aligner', 'tester', 'deployer']
for username in test_users:
user = session.query(Usuario).filter_by(username=username).first()
if user:
print(f"{username}: OK")
print(f"OTP configurado: {'Sim' if user.otp_secret else 'Não'}")
else:
print(f"{username}: FALHOU")
print("\n=== Instruções ===")
print("1. Use o aplicativo autenticador para configurar o OTP de cada usuário")
print("2. Faça login com cada usuário para testar")
print("3. Altere a senha no primeiro login")
print("\nCredenciais:")
print("Admin:")
print(" Usuário: admin")
print(" Senha: admin123")
print("\nUsuários de teste:")
print(" Usuário: aligner, tester, deployer")
print(" Senha: Test123!@#")
except Exception as e:
print(f"Erro ao verificar configuração: {str(e)}")
session.rollback()
finally:
session.close()
if __name__ == "__main__":
init_system()

View File

@@ -1,4 +1,4 @@
from functions.database import get_db_connection, Militante, EmailMilitante, Endereco
from functions.database import get_db_session, Militante, EmailMilitante, Endereco
from sqlalchemy.orm import joinedload
from datetime import datetime
from typing import List, Dict, Optional
@@ -14,7 +14,7 @@ class MilitanteModel:
@invalidate_cache_pattern("militantes:*")
def criar_militante(data: Dict) -> Dict:
"""Cria um novo militante"""
db = get_db_connection()
db = get_db_session()
try:
# Criar endereço se fornecido
endereco_id = None
@@ -89,7 +89,7 @@ class MilitanteModel:
@cached(expire=1800, key_prefix="militantes") # Cache for 30 minutes
def listar_militantes() -> List[Militante]:
"""Lista todos os militantes"""
db = get_db_connection()
db = get_db_session()
try:
militantes = db.query(Militante).options(
joinedload(Militante.emails),
@@ -125,7 +125,7 @@ class MilitanteModel:
return cached_militante
# Cache miss, get from database
db = get_db_connection()
db = get_db_session()
try:
militante = db.query(Militante).options(
joinedload(Militante.emails),
@@ -150,7 +150,7 @@ class MilitanteModel:
@invalidate_cache_pattern("militantes:*")
def atualizar_militante(militante_id: int, data: Dict) -> Dict:
"""Atualiza um militante existente"""
db = get_db_connection()
db = get_db_session()
try:
militante = db.query(Militante).get(militante_id)
@@ -228,7 +228,7 @@ class MilitanteModel:
@invalidate_cache_pattern("militantes:*")
def excluir_militante(militante_id: int) -> Dict:
"""Exclui um militante"""
db = get_db_connection()
db = get_db_session()
try:
militante = db.query(Militante).get(militante_id)
if not militante:
@@ -265,7 +265,7 @@ class MilitanteModel:
@cached(expire=1800, key_prefix="militantes")
def buscar_por_cpf(cpf: str) -> Optional[Militante]:
"""Busca um militante por CPF"""
db = get_db_connection()
db = get_db_session()
try:
militante = db.query(Militante).filter_by(cpf=cpf).first()
if militante:

View File

@@ -1,4 +1,4 @@
from functions.database import get_db_connection, Pagamento, Militante, TipoPagamento
from functions.database import get_db_session, Pagamento, Militante, TipoPagamento
from sqlalchemy.orm import joinedload
from datetime import datetime
from typing import List, Dict, Optional
@@ -9,7 +9,7 @@ class PagamentoModel:
@staticmethod
def criar_pagamento(data: Dict) -> Dict:
"""Cria um novo pagamento"""
db = get_db_connection()
db = get_db_session()
try:
pagamento = Pagamento(
militante_id=data['militante_id'],
@@ -38,7 +38,7 @@ class PagamentoModel:
@staticmethod
def listar_pagamentos() -> List[Pagamento]:
"""Lista todos os pagamentos"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Pagamento).join(Militante).order_by(Pagamento.data_pagamento.desc()).all()
finally:
@@ -47,7 +47,7 @@ class PagamentoModel:
@staticmethod
def buscar_por_id(pagamento_id: int) -> Optional[Pagamento]:
"""Busca um pagamento por ID"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Pagamento).get(pagamento_id)
finally:
@@ -56,7 +56,7 @@ class PagamentoModel:
@staticmethod
def atualizar_pagamento(pagamento_id: int, data: Dict) -> Dict:
"""Atualiza um pagamento existente"""
db = get_db_connection()
db = get_db_session()
try:
pagamento = db.query(Pagamento).get(pagamento_id)
@@ -90,7 +90,7 @@ class PagamentoModel:
@staticmethod
def excluir_pagamento(pagamento_id: int) -> Dict:
"""Exclui um pagamento"""
db = get_db_connection()
db = get_db_session()
try:
pagamento = db.query(Pagamento).get(pagamento_id)
if not pagamento:
@@ -119,7 +119,7 @@ class PagamentoModel:
@staticmethod
def listar_por_celula(celula_id: int) -> List[Pagamento]:
"""Lista pagamentos de uma célula específica"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Pagamento).filter_by(celula_id=celula_id).all()
finally:
@@ -128,7 +128,7 @@ class PagamentoModel:
@staticmethod
def listar_por_setor(setor_id: int) -> List[Pagamento]:
"""Lista pagamentos de um setor específico"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Pagamento).join(Usuario).filter(Usuario.setor_id == setor_id).all()
finally:
@@ -137,7 +137,7 @@ class PagamentoModel:
@staticmethod
def listar_por_cr(cr_id: int) -> List[Pagamento]:
"""Lista pagamentos de um CR específico"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Pagamento).join(Usuario).filter(Usuario.cr_id == cr_id).all()
finally:
@@ -146,7 +146,7 @@ class PagamentoModel:
@staticmethod
def listar_por_militante(militante_id: int) -> List[Pagamento]:
"""Lista pagamentos de um militante específico"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Pagamento).filter_by(militante_id=militante_id).order_by(Pagamento.data_pagamento.desc()).all()
finally:
@@ -155,7 +155,7 @@ class PagamentoModel:
@staticmethod
def obter_tipos_pagamento() -> List[TipoPagamento]:
"""Obtém todos os tipos de pagamento"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(TipoPagamento).order_by(TipoPagamento.descricao).all()
finally:
@@ -164,7 +164,7 @@ class PagamentoModel:
@staticmethod
def obter_militantes() -> List[Militante]:
"""Obtém todos os militantes"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Militante).order_by(Militante.nome).all()
finally:

View File

@@ -1,14 +1,19 @@
Flask==2.3.3
Flask-Bootstrap5==0.1.dev1
Flask==3.0.2
Flask-SQLAlchemy==3.1.1
Flask-Login==0.6.3
Flask-WTF==1.1.1
Flask-WTF==1.2.1
Flask-Mail==0.9.1
SQLAlchemy==2.0.21
Werkzeug==2.3.7
python-dotenv==1.0.0
pyotp==2.8.0
SQLAlchemy>=2.0.36
Werkzeug==3.0.1
python-dotenv==1.0.1
pyotp==2.9.0
qrcode==7.4.2
Pillow==10.0.1
redis==5.0.1
Pillow>=10.4.0
email-validator==2.3.0
cryptography==42.0.2
bcrypt==4.1.2
Bootstrap-Flask==2.3.3
PyJWT==2.8.0
gunicorn==21.2.0
faker==19.13.0
Faker==19.13.0
redis==5.0.1

View File

@@ -1,5 +1,5 @@
from flask import Blueprint, render_template, flash, redirect, url_for, request, jsonify
from functions.database import Usuario, get_db_connection
from functions.database import Usuario, get_db_session
from functions.decorators import require_login
from flask_login import login_required, current_user
from sqlalchemy.orm import joinedload
@@ -29,7 +29,7 @@ def admin_required(f):
@admin_required
def dashboard():
"""Dashboard principal da área administrativa com lista de usuários"""
db = get_db_connection()
db = get_db_session()
try:
now = datetime.now()
@@ -68,7 +68,7 @@ def dashboard():
@admin_required
def reset_user_otp(user_id):
"""Reseta o OTP de um usuário"""
db = get_db_connection()
db = get_db_session()
try:
user = db.query(Usuario).get(user_id)
if not user:
@@ -89,7 +89,7 @@ def reset_user_otp(user_id):
@admin_required
def reset_user_password(user_id):
"""Reseta a senha de um usuário"""
db = get_db_connection()
db = get_db_session()
try:
user = db.query(Usuario).get(user_id)
if not user:
@@ -111,7 +111,7 @@ def reset_user_password(user_id):
@admin_required
def toggle_user_status(user_id):
"""Ativa/desativa um usuário"""
db = get_db_connection()
db = get_db_session()
try:
user = db.query(Usuario).get(user_id)
if not user:

198
scripts/create_admin.py Normal file
View File

@@ -0,0 +1,198 @@
import os
import pyotp
from pathlib import Path
from functions.database import Usuario, Role, get_db_session
from services.otp_service import generate_qr_code
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin123"
ADMIN_ROLE = Role.SECRETARIO_GERAL
def salvar_qr_code(user):
"""
Gera o QR code para um usuário específico
Args:
user: Instância do modelo Usuario
Returns:
tuple: (caminho do arquivo, URI do OTP)
"""
# Tentar diferentes caminhos para salvar o QR code
qr_paths = [
Path('/tmp/admin_qr.png'), # Diretório temporário do sistema
Path('/data/admin_qr.png'), # Diretório de dados do container
Path('admin_qr.png') # Diretório atual (fallback fora do container)
]
# Tentar salvar em diferentes locais
qr_saved = False
saved_path = None
img = generate_qr_code(user) # Gera o QR code para o usuário
for qr_path in qr_paths:
try:
# Tentar salvar o arquivo
img.save(qr_path)
qr_saved = True
saved_path = qr_path
break
except Exception as e:
print(f"Não foi possível salvar o QR code em {qr_path}: {e}")
continue
if not qr_saved:
print("AVISO: Não foi possível salvar o QR code em nenhum local")
print("O QR code pode ser gerado manualmente usando o URI OTP")
saved_path = None
return saved_path
def _ensure_admin_role(db, admin_user, role):
admin_role = db.query(Role).filter_by(nome=role).first()
if admin_role is None:
admin_role = Role(nome=role, nivel=Role.SECRETARIO_GERAL)
db.add(admin_role)
db.flush()
if admin_role not in admin_user.roles:
admin_user.roles.append(admin_role)
db.flush()
def _ensure_admin_otp(db, admin_user):
if admin_user.otp_secret:
return False
secret = (os.environ.get('ADMIN_OTP_SECRET') or "").strip()
admin_user.otp_secret = secret or admin_user.generate_otp_secret()
db.flush()
return True
def create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE, save_qr=True):
"""Limpa e cria o usuário admin"""
db = get_db_session()
try:
# Verificar se já existe um usuário admin
admin_user = db.query(Usuario).filter_by(username=username).first()
if admin_user is not None:
db.delete(admin_user)
db.flush()
print("\n=== Criando Novo Usuário Admin ===")
admin_user = Usuario(
username=username,
email="admin@example.com",
is_admin=True
)
admin_user.set_password(password)
db.add(admin_user)
_ensure_admin_otp(db, admin_user)
_ensure_admin_role(db, admin_user, role)
db.commit()
qr_path = salvar_qr_code(admin_user)
# Mostrar informações
print("\n=== Informações do Admin ===")
print(f"Username: {admin_user.username}")
print(f"Email: {admin_user.email}")
print(f"Senha: {password}")
print(f"Segredo OTP: {admin_user.otp_secret}")
print(f"URI do OTP: {admin_user.get_otp_uri()}")
if qr_path:
print(f"QR Code salvo em: {qr_path}")
else:
print("QR Code não foi salvo. Use o URI do OTP ou o Segredo OTP para configuração manual.")
print("\n=== Instruções para Configuração ===")
print("1. Instale um aplicativo autenticador no seu celular")
print(" (Google Authenticator, Microsoft Authenticator, etc)")
print("2. Abra o aplicativo")
print("3. Selecione a opção para adicionar uma nova conta")
if qr_path:
print("4. Escaneie o QR Code salvo em:", qr_path)
print("\nOU configure manualmente:")
print(f"- Nome da conta: {admin_user.username}")
print(f"- Segredo OTP: {admin_user.otp_secret}")
print("- Tipo: Baseado em tempo (TOTP)")
print("- Algoritmo: SHA1")
print("- Dígitos: 6")
print("- Intervalo: 30 segundos")
# Gerar código atual para verificação
totp = pyotp.TOTP(admin_user.otp_secret)
current_code = totp.now()
print("\n=== Verificação do OTP ===")
print(f"Código OTP atual: {current_code}")
is_valid = admin_user.verify_otp(current_code)
print(f"Verificação do código: {'Sucesso' if is_valid else 'Falha'}")
if not is_valid:
print("\nALERTA: Verificação do OTP falhou!")
print("Por favor, verifique se o segredo OTP está correto.")
# Fazer commit final para garantir que tudo foi salvo
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
db.close()
def verify_admin(username=ADMIN_USERNAME, role=ADMIN_ROLE, save_qr=False):
"""Verifica se o usuário admin existe e tem OTP configurado"""
db = get_db_session()
try:
admin_user = db.query(Usuario).filter_by(username=username).first()
if admin_user is not None:
print("\n=== Usuário Admin Encontrado ===")
_ensure_admin_otp(db, admin_user)
_ensure_admin_role(db, admin_user, role)
return True
else:
print("\n=== Usuário Admin NÃO Encontrado ===")
return False
except Exception as e:
print(f"Erro ao verificar o usuário admin: {e}")
raise
finally:
db.close()
def rotate_admin_otp(username=ADMIN_USERNAME, save_qr=False):
db = get_db_session()
try:
admin_user = db.query(Usuario).filter_by(username=username).first()
if admin_user is None:
print("Usuário admin não encontrado")
return False
admin_user.generate_otp_secret()
db.commit()
print(f"OTP do usuário '{username}' foi rotacionado.")
print(f"Novo segredo OTP: {admin_user.otp_secret}")
if save_qr:
qr_path = salvar_qr_code(admin_user)
if qr_path:
print(f"Novo QR code salvo em: {qr_path}")
else:
print("Não foi possível salvar o QR code automaticamente.")
except Exception:
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
create_admin()

View File

@@ -1,9 +1,9 @@
from functions.database import get_db_connection, Usuario, Role
from functions.database import get_db_session, Usuario, Role
from werkzeug.security import generate_password_hash
def create_test_users():
"""Cria usuários de teste"""
db = get_db_connection()
db = get_db_session()
try:
# Lista de usuários de teste
test_users = [

View File

@@ -1,27 +0,0 @@
from functions.database import Role, Permissao, RolePermissao, Base, engine
from sqlalchemy.orm import Session
def init_db():
Base.metadata.create_all(engine)
with Session(engine) as session:
# Criar roles
admin = Role(nome='Administrador', nivel=1)
coord = Role(nome='Coordenador', nivel=2)
milit = Role(nome='Militante', nivel=3)
# Criar permissões
perm_admin = Permissao(nome='admin', descricao='Acesso total')
perm_militantes = Permissao(nome='ver_militantes', descricao='Ver militantes')
# ... outras permissões ...
session.add_all([admin, coord, milit, perm_admin, perm_militantes])
session.commit()
# Associar permissões aos roles
session.add(RolePermissao(role=admin, permissao=perm_admin))
session.add(RolePermissao(role=coord, permissao=perm_militantes))
session.commit()
if __name__ == '__main__':
init_db()

92
scripts/manage.py Normal file
View File

@@ -0,0 +1,92 @@
import argparse
import sys
from pathlib import Path
from dotenv import load_dotenv
ROOT_DIR = Path(__file__).resolve().parents[1]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
# Carregar .env antes de importar módulos
load_dotenv(ROOT_DIR / ".env")
from functions.base import Base, engine, get_db_session
from functions.rbac import Role, init_rbac
from scripts.create_admin import create_admin, rotate_admin_otp
from scripts.create_test_users import create_test_users
from scripts.seed_database import seed_database
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin123"
ADMIN_ROLE = Role.SECRETARIO_GERAL
def reset_db(args):
"""Inicializando banco de dados e criando tabelas"""
db = get_db_session()
try:
# Criar todas as tabelas
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
except Exception as e:
print(f"Erro na drop ou create all da Base: {e}")
db.rollback()
raise
finally:
db.close()
print("Inicializando sistema RBAC...")
init_rbac()
print("Cria usuário admin...")
create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE)
print("Banco inicializado com sucesso.")
return 0
def seed_db_with_fakes(args):
"""Função para popular o banco com dados fake para desenvolvimento"""
seed_database()
def seed_db_test_users(args):
"""Função para popular o banco com dados fake para desenvolvimento"""
create_test_users()
def reset_admin(args):
create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE)
def rotate_admin_otp_cmd(args):
rotate_admin_otp(username=ADMIN_USERNAME, save_qr=True)
def build_parser():
parser = argparse.ArgumentParser(description="Gerenciador de comandos do sistema Controles")
subparsers = parser.add_subparsers(dest="command", required=True)
db_reset_parser = subparsers.add_parser("db_reset", help="Reseta o banco e recria tabelas, RBAC e admin")
db_reset_parser.set_defaults(func=reset_db)
db_seed_fake_parser = subparsers.add_parser("db_seed_fake", help="Adiciona dados falsos para desenvolvimento")
db_seed_fake_parser.set_defaults(func=seed_db_with_fakes)
db_seed_test_users_parser = subparsers.add_parser("db_seed_test_users", help="Adiciona usuários de teste para desenvolvimento")
db_seed_test_users_parser.set_defaults(func=seed_db_test_users)
admin_reset_parser = subparsers.add_parser("admin_reset", help="Reseta o usuário admin (padrão: admin123)")
admin_reset_parser.set_defaults(func=reset_admin)
admin_rotate_otp_parser = subparsers.add_parser("admin_rotate_otp", help="Rotaciona o OTP do usuário admin - se não definido em .env")
admin_rotate_otp_parser.set_defaults(func=rotate_admin_otp_cmd)
return parser
def main():
parser = build_parser()
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -2,32 +2,31 @@ from datetime import datetime, timedelta
from functions.database import (
Base, Militante, CotaMensal, TipoPagamento, Pagamento,
MaterialVendido, TipoMaterial, VendaJornalAvulso, AssinaturaAnual,
RelatorioCotasMensais, RelatorioVendasMateriais, engine, SessionLocal,
RelatorioCotasMensais, RelatorioVendasMateriais,
Setor, ComiteCentral, Usuario, Role, EmailMilitante, Endereco,
ComiteRegional, Celula, EstadoMilitante
ComiteRegional, Celula, EstadoMilitante, get_db_session
)
import random
from faker import Faker
import time
from werkzeug.security import generate_password_hash
fake = Faker('pt_BR')
def criar_estrutura_organizacional(session):
def criar_estrutura_organizacional(db):
"""Cria a estrutura organizacional básica"""
print("\nCriando estrutura organizacional...")
# Criar Comitê Central
cc = ComiteCentral(nome="Comitê Central SP")
session.add(cc)
session.flush()
db.add(cc)
db.flush()
# Criar Comitês Regionais
crs = []
for nome in ["CR São Paulo", "CR ABC", "CR Campinas"]:
cr = ComiteRegional(nome=nome)
session.add(cr)
session.flush()
db.add(cr)
db.flush()
crs.append(cr)
# Criar Setores para cada CR
@@ -38,8 +37,8 @@ def criar_estrutura_organizacional(session):
nome=f"Setor {i+1} - {cr.nome}",
cr_id=cr.id
)
session.add(setor)
session.flush()
db.add(setor)
db.flush()
setores.append(setor)
# Criar Células para cada Setor
@@ -49,12 +48,12 @@ def criar_estrutura_organizacional(session):
nome=f"Célula {i+1} - {setor.nome}",
setor_id=setor.id
)
session.add(celula)
db.add(celula)
session.commit()
db.commit()
return crs, setores
def criar_tipos_pagamento(session):
def criar_tipos_pagamento(db):
"""Cria tipos de pagamento padrão"""
print("\nCriando tipos de pagamento...")
tipos = [
@@ -65,11 +64,11 @@ def criar_tipos_pagamento(session):
"Transferência Bancária"
]
for tipo in tipos:
if not session.query(TipoPagamento).filter_by(descricao=tipo).first():
session.add(TipoPagamento(descricao=tipo))
session.commit()
if not db.query(TipoPagamento).filter_by(descricao=tipo).first():
db.add(TipoPagamento(descricao=tipo))
db.commit()
def criar_tipos_material(session):
def criar_tipos_material(db):
"""Cria tipos de material padrão"""
print("\nCriando tipos de material...")
tipos = [
@@ -80,11 +79,11 @@ def criar_tipos_material(session):
"Cartilha"
]
for tipo in tipos:
if not session.query(TipoMaterial).filter_by(descricao=tipo).first():
session.add(TipoMaterial(descricao=tipo))
session.commit()
if not db.query(TipoMaterial).filter_by(descricao=tipo).first():
db.add(TipoMaterial(descricao=tipo))
db.commit()
def criar_militantes(session, num_militantes, setores):
def criar_militantes(db, num_militantes, setores):
"""Cria militantes com todos os dados necessários"""
print(f"\nCriando {num_militantes} militantes...")
militantes = []
@@ -96,13 +95,6 @@ def criar_militantes(session, num_militantes, setores):
nome = fake.name()
cpf = fake.cpf()
# Email único
while True:
email = fake.email()
if email not in emails_usados:
emails_usados.add(email)
break
# Criar endereço
endereco = Endereco(
cep=fake.postcode(),
@@ -113,12 +105,12 @@ def criar_militantes(session, num_militantes, setores):
numero=str(random.randint(1, 999)),
complemento=f"Bloco {random.randint(1, 10)}, Apto {random.randint(1, 999)}" if random.random() < 0.3 else None
)
session.add(endereco)
session.flush()
db.add(endereco)
db.flush()
# Selecionar setor e célula aleatórios
setor = random.choice(setores)
celula = random.choice(session.query(Celula).filter_by(setor_id=setor.id).all())
celula = random.choice(db.query(Celula).filter_by(setor_id=setor.id).all())
# Definir responsabilidades
responsabilidades = 0
@@ -168,27 +160,34 @@ def criar_militantes(session, num_militantes, setores):
responsabilidades=responsabilidades,
estado=random.choice(list(EstadoMilitante))
)
session.add(militante)
session.flush()
db.add(militante)
db.flush()
# Email único
while True:
email = fake.email()
if email not in emails_usados:
emails_usados.add(email)
break
# Criar email do militante
email_militante = EmailMilitante(
militante_id=militante.id,
endereco_email=email
)
session.add(email_militante)
db.add(email_militante)
militantes.append(militante)
session.commit()
db.commit()
except Exception as e:
print(f"Erro ao criar militante {i+1}: {e}")
session.rollback()
db.rollback()
continue
return militantes
def criar_cotas(session, militantes):
def criar_cotas(db, militantes):
"""Cria cotas mensais para os militantes"""
print("\nCriando cotas mensais...")
for militante in militantes:
@@ -205,16 +204,16 @@ def criar_cotas(session, militantes):
data_vencimento=data_base + timedelta(days=30),
pago=random.choice([True, False])
)
session.add(cota)
session.commit()
db.add(cota)
db.commit()
except Exception as e:
print(f"Erro ao criar cotas para militante {militante.nome}: {e}")
session.rollback()
db.rollback()
def criar_pagamentos(session, militantes):
def criar_pagamentos(db, militantes):
"""Cria pagamentos para os militantes"""
print("\nCriando pagamentos...")
tipos_pagamento = session.query(TipoPagamento).all()
tipos_pagamento = db.query(TipoPagamento).all()
for militante in militantes:
try:
@@ -227,16 +226,16 @@ def criar_pagamentos(session, militantes):
valor=random.uniform(50, 500),
data_pagamento=fake.date_between(start_date='-1y', end_date='today')
)
session.add(pagamento)
session.commit()
db.add(pagamento)
db.commit()
except Exception as e:
print(f"Erro ao criar pagamentos para militante {militante.nome}: {e}")
session.rollback()
db.rollback()
def criar_materiais_vendidos(session, militantes):
def criar_materiais_vendidos(db, militantes):
"""Cria registros de materiais vendidos"""
print("\nCriando materiais vendidos...")
tipos_material = session.query(TipoMaterial).all()
tipos_material = db.query(TipoMaterial).all()
for militante in militantes:
try:
@@ -249,13 +248,13 @@ def criar_materiais_vendidos(session, militantes):
valor=random.uniform(20, 100),
data_venda=fake.date_time_between(start_date='-1y', end_date='now')
)
session.add(material)
session.commit()
db.add(material)
db.commit()
except Exception as e:
print(f"Erro ao criar materiais vendidos para militante {militante.nome}: {e}")
session.rollback()
db.rollback()
def criar_vendas_jornal(session, militantes):
def criar_vendas_jornal(db, militantes):
"""Cria vendas de jornal avulso"""
print("\nCriando vendas de jornal...")
for militante in militantes:
@@ -270,16 +269,16 @@ def criar_vendas_jornal(session, militantes):
valor_total=quantidade * valor_unitario,
data_venda=fake.date_time_between(start_date='-1y', end_date='now')
)
session.add(venda)
session.commit()
db.add(venda)
db.commit()
except Exception as e:
print(f"Erro ao criar vendas de jornal para militante {militante.nome}: {e}")
session.rollback()
db.rollback()
def criar_assinaturas(session, militantes):
def criar_assinaturas(db, militantes):
"""Cria assinaturas anuais"""
print("\nCriando assinaturas anuais...")
tipos_material = session.query(TipoMaterial).all()
tipos_material = db.query(TipoMaterial).all()
for militante in militantes:
try:
@@ -294,42 +293,45 @@ def criar_assinaturas(session, militantes):
data_inicio=data_inicio,
data_fim=data_inicio + timedelta(days=365)
)
session.add(assinatura)
session.commit()
db.add(assinatura)
db.commit()
except Exception as e:
print(f"Erro ao criar assinatura para militante {militante.nome}: {e}")
session.rollback()
db.rollback()
def seed_database():
"""Função principal para popular o banco de dados"""
session = SessionLocal()
db = get_db_session()
try:
print("Iniciando população do banco de dados...")
# Criar estrutura organizacional
crs, setores = criar_estrutura_organizacional(session)
# Criar tipos básicos
criar_tipos_pagamento(session)
criar_tipos_material(session)
criar_tipos_pagamento(db)
criar_tipos_material(db)
# Criar estrutura organizacional
crs, setores = criar_estrutura_organizacional(db)
# Criar militantes (30 militantes para teste)
militantes = criar_militantes(session, 30, setores)
militantes = criar_militantes(db, 30, setores)
# Criar dados financeiros e materiais
criar_cotas(session, militantes)
criar_pagamentos(session, militantes)
criar_materiais_vendidos(session, militantes)
criar_vendas_jornal(session, militantes)
criar_assinaturas(session, militantes)
criar_cotas(db, militantes)
criar_pagamentos(db, militantes)
criar_materiais_vendidos(db, militantes)
criar_vendas_jornal(db, militantes)
criar_assinaturas(db, militantes)
print("\nBanco de dados populado com sucesso!")
except Exception as e:
print(f"Erro durante a população do banco: {e}")
session.rollback()
db.rollback()
finally:
session.close()
db.close()
if __name__ == "__main__":
seed_database()
seed_database()

View File

@@ -1,11 +1,8 @@
from functions.database import get_db_connection, Usuario
from functions.database import get_db_session, Usuario
from flask_login import login_user, logout_user
from datetime import datetime
from typing import Dict, Optional
import pyotp
import qrcode
import base64
from io import BytesIO
from services.otp_service import generate_qr_code_base64
class AuthService:
"""Service para operações de autenticação"""
@@ -13,7 +10,7 @@ class AuthService:
@staticmethod
def autenticar_usuario(email_or_username: str, password: str, otp: str = None) -> Dict:
"""Autentica um usuário"""
db = get_db_connection()
db = get_db_session()
try:
# Tenta encontrar o usuário por email ou username
user = db.query(Usuario).filter(
@@ -64,7 +61,7 @@ class AuthService:
@staticmethod
def desautenticar_usuario(user) -> Dict:
"""Desautentica um usuário"""
db = get_db_connection()
db = get_db_session()
try:
if user:
user.logout()
@@ -87,7 +84,7 @@ class AuthService:
@staticmethod
def alterar_senha(user_id: int, senha_atual: str, nova_senha: str) -> Dict:
"""Altera a senha de um usuário"""
db = get_db_connection()
db = get_db_session()
try:
user = db.query(Usuario).get(user_id)
if not user:
@@ -122,20 +119,7 @@ class AuthService:
@staticmethod
def gerar_qr_code(user) -> str:
"""Gera um QR code para o usuário"""
if not user.otp_secret:
user.otp_secret = pyotp.random_base32()
totp = pyotp.TOTP(user.otp_secret)
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp.provisioning_uri(user.email, issuer_name="Sistema de Controles"))
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
qr_code = base64.b64encode(buffer.getvalue()).decode('utf-8')
return qr_code
return generate_qr_code_base64(user)
@staticmethod
def verificar_sessao(user) -> Dict:
@@ -154,4 +138,4 @@ class AuthService:
return {
'valid': True
}
}

View File

@@ -1,4 +1,4 @@
from functions.database import get_db_connection, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento
from functions.database import get_db_session, Militante, Pagamento, CotaMensal, MaterialVendido, AssinaturaAnual, TipoPagamento
from sqlalchemy import func
from sqlalchemy.orm import joinedload
from datetime import datetime, timedelta
@@ -15,7 +15,7 @@ class DashboardService:
@cached(expire=300, key_prefix="dashboard") # Cache for 5 minutes
def get_dashboard_stats() -> Dict[str, Any]:
"""Get dashboard statistics with caching"""
db = get_db_connection()
db = get_db_session()
try:
# Get cached stats first
cache_key = CacheKeys.DASHBOARD_STATS
@@ -146,7 +146,7 @@ class DashboardService:
@cached(expire=600, key_prefix="dashboard") # Cache for 10 minutes
def get_militante_stats() -> Dict[str, Any]:
"""Get militante-specific statistics"""
db = get_db_connection()
db = get_db_session()
try:
# Militantes por estado
estados = db.query(Militante.estado, func.count(Militante.id)).group_by(Militante.estado).all()
@@ -175,7 +175,7 @@ class DashboardService:
@cached(expire=300, key_prefix="dashboard")
def get_financial_stats() -> Dict[str, Any]:
"""Get financial statistics"""
db = get_db_connection()
db = get_db_session()
try:
# Total de pagamentos
total_pagamentos = db.query(func.sum(Pagamento.valor)).scalar()
@@ -218,7 +218,7 @@ class DashboardService:
@staticmethod
def obter_ultimos_militantes(limite: int = 5) -> List[Militante]:
"""Obtém os últimos militantes cadastrados"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Militante).order_by(Militante.id.desc()).limit(limite).all()
finally:
@@ -227,7 +227,7 @@ class DashboardService:
@staticmethod
def obter_ultimos_pagamentos(limite: int = 5) -> List[Pagamento]:
"""Obtém os últimos pagamentos realizados"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(Pagamento).join(Militante).order_by(Pagamento.data_pagamento.desc()).limit(limite).all()
finally:
@@ -236,7 +236,7 @@ class DashboardService:
@staticmethod
def obter_tipos_pagamento() -> List[TipoPagamento]:
"""Obtém todos os tipos de pagamento"""
db = get_db_connection()
db = get_db_session()
try:
return db.query(TipoPagamento).all()
finally:

19
services/otp_service.py Normal file
View File

@@ -0,0 +1,19 @@
import base64
import pyotp
import qrcode
from io import BytesIO
def generate_qr_code(user):
"""Gera imagem PIL do QR code OTP para o usuário."""
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(user.get_otp_uri())
qr.make(fit=True)
return qr.make_image(fill_color="black", back_color="white")
def generate_qr_code_base64(user):
"""Gera QR code OTP codificado em base64 (PNG)."""
img = generate_qr_code(user)
buffer = BytesIO()
img.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode("utf-8")

View File

@@ -2,10 +2,15 @@ import os
import sqlite3
import sys
from pathlib import Path
from dotenv import load_dotenv
# Adiciona o diretório raiz ao PYTHONPATH
root_dir = str(Path(__file__).parent.parent)
sys.path.append(root_dir)
ROOT_DIR = Path(__file__).resolve().parents[1]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
# Carregar .env antes de importar módulos
load_dotenv(ROOT_DIR / ".env")
from functions.base import Base, engine
from functions.database import init_database
@@ -63,4 +68,4 @@ def migrate_database():
print("Migração concluída com sucesso!")
if __name__ == '__main__':
migrate_database()
migrate_database()

View File

@@ -1,25 +1,25 @@
from functions.database import get_db_connection, Usuario
from functions.database import get_db_session, Usuario
from functions.rbac import Role, Permission
def migrate_existing_users():
"""Migra os usuários existentes para o novo sistema RBAC"""
session = get_db_connection()
db = get_db_session()
try:
# Buscar todos os usuários
usuarios = session.query(Usuario).all()
usuarios = db.query(Usuario).all()
# Buscar ou criar role de administrador
admin_role = session.query(Role).filter_by(nome="Administrador").first()
admin_role = db.query(Role).filter_by(nome="Administrador").first()
if not admin_role:
admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL)
session.add(admin_role)
db.add(admin_role)
# Buscar ou criar role de militante básico
militante_role = session.query(Role).filter_by(nome="Militante Básico").first()
militante_role = db.query(Role).filter_by(nome="Militante Básico").first()
if not militante_role:
militante_role = Role(nome="Militante Básico", nivel=Role.MILITANTE_BASICO)
session.add(militante_role)
db.add(militante_role)
# Atualizar usuários
for usuario in usuarios:
@@ -33,15 +33,15 @@ def migrate_existing_users():
else:
usuario.roles.append(militante_role)
session.commit()
db.commit()
print("Migração de usuários concluída com sucesso!")
except Exception as e:
session.rollback()
db.rollback()
print(f"Erro durante a migração de usuários: {str(e)}")
raise e
finally:
session.close()
db.close()
if __name__ == '__main__':
migrate_existing_users()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 135 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 144 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 144 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 122 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 141 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

View File

@@ -1,6 +1,6 @@
import pytest
from app import create_app
from functions.database import init_database, get_db_connection
from functions.database import init_database, get_db_session
@pytest.fixture
def app():
@@ -15,7 +15,7 @@ def app():
yield app
# Limpar banco após os testes
db = get_db_connection()
db = get_db_session()
try:
db.execute('DROP TABLE IF EXISTS usuarios CASCADE')
db.commit()

View File

@@ -1,13 +1,13 @@
import pytest
from flask import url_for
from functions.database import Usuario, get_db_connection
from functions.database import Usuario, get_db_session
from werkzeug.security import generate_password_hash
import json
@pytest.fixture
def admin_user(client):
"""Fixture que cria um usuário admin para testes"""
db = get_db_connection()
db = get_db_session()
try:
admin = Usuario(
username='admin_test',
@@ -74,7 +74,7 @@ def test_toggle_status(auth_admin_client, admin_user):
def test_acesso_nao_admin(client):
"""Testa acesso de usuário não admin"""
db = get_db_connection()
db = get_db_session()
try:
# Criar usuário normal
user = Usuario(

View File

@@ -1,7 +1,7 @@
import pytest
from flask import url_for
from flask_login import login_user
from functions.database import get_db_connection, Usuario
from functions.database import get_db_session, Usuario
import pyotp
class TestMenuNavigation: