- inits centralizados, READMEs atualizados

- padronizando o nome de get_db_connection e session para get_db_session, para não confundir com session do Flask ou sessoes web

- corrigindo potenciais erros

-- has_permission nao consegue com lazy load carregar permission depois de load_user fechar a conexao, entao joinedLoad com Permission antes de fechar

-- db.rollback não existe caso db = get_db_session() apareça muito depois dentro do try, padronizando antes de try

--- comparar role por nivel (Role.SECRETARIO_GERAL) e nao por nome ("Secretario Geral")

- unificacao de get_otp_qr_code

- mudança de nowutc() para now(UTC) conforme novo padrão
This commit is contained in:
2026-02-20 17:19:15 -03:00
parent 6882b57081
commit 2b1668206d
38 changed files with 1250 additions and 1187 deletions

198
scripts/create_admin.py Normal file
View File

@@ -0,0 +1,198 @@
import os
import pyotp
from pathlib import Path
from functions.database import Usuario, Role, get_db_session
from services.otp_service import generate_qr_code
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin123"
ADMIN_ROLE = Role.SECRETARIO_GERAL
def salvar_qr_code(user):
"""
Gera o QR code para um usuário específico
Args:
user: Instância do modelo Usuario
Returns:
tuple: (caminho do arquivo, URI do OTP)
"""
# Tentar diferentes caminhos para salvar o QR code
qr_paths = [
Path('/tmp/admin_qr.png'), # Diretório temporário do sistema
Path('/data/admin_qr.png'), # Diretório de dados do container
Path('admin_qr.png') # Diretório atual (fallback fora do container)
]
# Tentar salvar em diferentes locais
qr_saved = False
saved_path = None
img = generate_qr_code(user) # Gera o QR code para o usuário
for qr_path in qr_paths:
try:
# Tentar salvar o arquivo
img.save(qr_path)
qr_saved = True
saved_path = qr_path
break
except Exception as e:
print(f"Não foi possível salvar o QR code em {qr_path}: {e}")
continue
if not qr_saved:
print("AVISO: Não foi possível salvar o QR code em nenhum local")
print("O QR code pode ser gerado manualmente usando o URI OTP")
saved_path = None
return saved_path
def _ensure_admin_role(db, admin_user, role):
admin_role = db.query(Role).filter_by(nome=role).first()
if admin_role is None:
admin_role = Role(nome=role, nivel=Role.SECRETARIO_GERAL)
db.add(admin_role)
db.flush()
if admin_role not in admin_user.roles:
admin_user.roles.append(admin_role)
db.flush()
def _ensure_admin_otp(db, admin_user):
if admin_user.otp_secret:
return False
secret = (os.environ.get('ADMIN_OTP_SECRET') or "").strip()
admin_user.otp_secret = secret or admin_user.generate_otp_secret()
db.flush()
return True
def create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE, save_qr=True):
"""Limpa e cria o usuário admin"""
db = get_db_session()
try:
# Verificar se já existe um usuário admin
admin_user = db.query(Usuario).filter_by(username=username).first()
if admin_user is not None:
db.delete(admin_user)
db.flush()
print("\n=== Criando Novo Usuário Admin ===")
admin_user = Usuario(
username=username,
email="admin@example.com",
is_admin=True
)
admin_user.set_password(password)
_ensure_admin_otp(db, admin_user)
_ensure_admin_role(db, admin_user, role)
db.add(admin_user)
db.commit()
qr_path = salvar_qr_code(admin_user)
# Mostrar informações
print("\n=== Informações do Admin ===")
print(f"Username: {admin_user.username}")
print(f"Email: {admin_user.email}")
print(f"Senha: {password}")
print(f"Segredo OTP: {admin_user.otp_secret}")
print(f"URI do OTP: {admin_user.get_otp_uri()}")
if qr_path:
print(f"QR Code salvo em: {qr_path}")
else:
print("QR Code não foi salvo. Use o URI do OTP ou o Segredo OTP para configuração manual.")
print("\n=== Instruções para Configuração ===")
print("1. Instale um aplicativo autenticador no seu celular")
print(" (Google Authenticator, Microsoft Authenticator, etc)")
print("2. Abra o aplicativo")
print("3. Selecione a opção para adicionar uma nova conta")
if qr_path:
print("4. Escaneie o QR Code salvo em:", qr_path)
print("\nOU configure manualmente:")
print(f"- Nome da conta: {admin_user.username}")
print(f"- Segredo OTP: {admin_user.otp_secret}")
print("- Tipo: Baseado em tempo (TOTP)")
print("- Algoritmo: SHA1")
print("- Dígitos: 6")
print("- Intervalo: 30 segundos")
# Gerar código atual para verificação
totp = pyotp.TOTP(admin_user.otp_secret)
current_code = totp.now()
print("\n=== Verificação do OTP ===")
print(f"Código OTP atual: {current_code}")
is_valid = admin_user.verify_otp(current_code)
print(f"Verificação do código: {'Sucesso' if is_valid else 'Falha'}")
if not is_valid:
print("\nALERTA: Verificação do OTP falhou!")
print("Por favor, verifique se o segredo OTP está correto.")
# Fazer commit final para garantir que tudo foi salvo
db.commit()
except Exception as e:
db.rollback()
raise e
finally:
db.close()
def verify_admin(username=ADMIN_USERNAME, role=ADMIN_ROLE, save_qr=False):
"""Verifica se o usuário admin existe e tem OTP configurado"""
db = get_db_session()
try:
admin_user = db.query(Usuario).filter_by(username=username).first()
if admin_user is not None:
print("\n=== Usuário Admin Encontrado ===")
_ensure_admin_otp(db, admin_user)
_ensure_admin_role(db, admin_user, role)
return True
else:
print("\n=== Usuário Admin NÃO Encontrado ===")
return False
except Exception as e:
print(f"Erro ao verificar o usuário admin: {e}")
raise
finally:
db.close()
def rotate_admin_otp(username=ADMIN_USERNAME, save_qr=False):
db = get_db_session()
try:
admin_user = db.query(Usuario).filter_by(username=username).first()
if admin_user is None:
print("Usuário admin não encontrado")
return False
admin_user.generate_otp_secret()
db.commit()
print(f"OTP do usuário '{username}' foi rotacionado.")
print(f"Novo segredo OTP: {admin_user.otp_secret}")
if save_qr:
qr_path = salvar_qr_code(admin_user)
if qr_path:
print(f"Novo QR code salvo em: {qr_path}")
else:
print("Não foi possível salvar o QR code automaticamente.")
except Exception:
db.rollback()
raise
finally:
db.close()
if __name__ == "__main__":
create_admin()

View File

@@ -0,0 +1,56 @@
from functions.database import get_db_session, Usuario, Role
from werkzeug.security import generate_password_hash
def create_test_users():
"""Cria usuários de teste"""
db = get_db_session()
try:
# Lista de usuários de teste
test_users = [
{
'username': 'aligner',
'email': 'aligner@test.com',
'password': 'Test123!@#',
'is_admin': False
},
{
'username': 'tester',
'email': 'tester@test.com',
'password': 'Test123!@#',
'is_admin': False
},
{
'username': 'deployer',
'email': 'deployer@test.com',
'password': 'Test123!@#',
'is_admin': False
}
]
# Criar cada usuário
for user_data in test_users:
user = db.query(Usuario).filter_by(username=user_data['username']).first()
if not user:
user = Usuario(
username=user_data['username'],
email=user_data['email'],
is_admin=user_data['is_admin']
)
user.set_password(user_data['password'])
db.add(user)
print(f"Usuário {user_data['username']} criado")
else:
print(f"Usuário {user_data['username']} já existe")
db.commit()
print("Usuários de teste criados com sucesso")
except Exception as e:
print(f"Erro ao criar usuários de teste: {str(e)}")
db.rollback()
finally:
db.close()
if __name__ == "__main__":
create_test_users()

View File

@@ -1,27 +0,0 @@
from functions.database import Role, Permissao, RolePermissao, Base, engine
from sqlalchemy.orm import Session
def init_db():
Base.metadata.create_all(engine)
with Session(engine) as session:
# Criar roles
admin = Role(nome='Administrador', nivel=1)
coord = Role(nome='Coordenador', nivel=2)
milit = Role(nome='Militante', nivel=3)
# Criar permissões
perm_admin = Permissao(nome='admin', descricao='Acesso total')
perm_militantes = Permissao(nome='ver_militantes', descricao='Ver militantes')
# ... outras permissões ...
session.add_all([admin, coord, milit, perm_admin, perm_militantes])
session.commit()
# Associar permissões aos roles
session.add(RolePermissao(role=admin, permissao=perm_admin))
session.add(RolePermissao(role=coord, permissao=perm_militantes))
session.commit()
if __name__ == '__main__':
init_db()

92
scripts/manage.py Normal file
View File

@@ -0,0 +1,92 @@
import argparse
import sys
from pathlib import Path
from dotenv import load_dotenv
ROOT_DIR = Path(__file__).resolve().parents[1]
if str(ROOT_DIR) not in sys.path:
sys.path.insert(0, str(ROOT_DIR))
# Carregar .env antes de importar módulos
load_dotenv(ROOT_DIR / ".env")
from functions.base import Base, engine, get_db_session
from functions.rbac import Role, init_rbac
from scripts.create_admin import create_admin, rotate_admin_otp
from scripts.create_test_users import create_test_users
from scripts.seed_database import seed_database
ADMIN_USERNAME = "admin"
ADMIN_PASSWORD = "admin123"
ADMIN_ROLE = Role.SECRETARIO_GERAL
def reset_db(args):
"""Inicializando banco de dados e criando tabelas"""
db = get_db_session()
try:
# Criar todas as tabelas
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
except Exception as e:
print(f"Erro na drop ou create all da Base: {e}")
db.rollback()
raise
finally:
db.close()
print("Inicializando sistema RBAC...")
init_rbac()
print("Cria usuário admin...")
create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE)
print("Banco inicializado com sucesso.")
return 0
def seed_db_with_fakes(args):
"""Função para popular o banco com dados fake para desenvolvimento"""
seed_database()
def seed_db_test_users(args):
"""Função para popular o banco com dados fake para desenvolvimento"""
create_test_users()
def reset_admin(args):
create_admin(username=ADMIN_USERNAME, password=ADMIN_PASSWORD, role=ADMIN_ROLE)
def rotate_admin_otp_cmd(args):
rotate_admin_otp(username=ADMIN_USERNAME, save_qr=True)
def build_parser():
parser = argparse.ArgumentParser(description="Gerenciador de comandos do sistema Controles")
subparsers = parser.add_subparsers(dest="command", required=True)
db_reset_parser = subparsers.add_parser("db_reset", help="Reseta o banco e recria tabelas, RBAC e admin")
db_reset_parser.set_defaults(func=reset_db)
db_seed_fake_parser = subparsers.add_parser("db_seed_fake", help="Adiciona dados falsos para desenvolvimento")
db_seed_fake_parser.set_defaults(func=seed_db_with_fakes)
db_seed_test_users_parser = subparsers.add_parser("db_seed_test_users", help="Adiciona usuários de teste para desenvolvimento")
db_seed_test_users_parser.set_defaults(func=seed_db_test_users)
admin_reset_parser = subparsers.add_parser("admin_reset", help="Reseta o usuário admin (padrão: admin123)")
admin_reset_parser.set_defaults(func=reset_admin)
admin_rotate_otp_parser = subparsers.add_parser("admin_rotate_otp", help="Rotaciona o OTP do usuário admin - se não definido em .env")
admin_rotate_otp_parser.set_defaults(func=rotate_admin_otp_cmd)
return parser
def main():
parser = build_parser()
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())

337
scripts/seed_database.py Normal file
View File

@@ -0,0 +1,337 @@
from datetime import datetime, timedelta
from functions.database import (
Base, Militante, CotaMensal, TipoPagamento, Pagamento,
MaterialVendido, TipoMaterial, VendaJornalAvulso, AssinaturaAnual,
RelatorioCotasMensais, RelatorioVendasMateriais,
Setor, ComiteCentral, Usuario, Role, EmailMilitante, Endereco,
ComiteRegional, Celula, EstadoMilitante, get_db_session
)
import random
from faker import Faker
from werkzeug.security import generate_password_hash
fake = Faker('pt_BR')
def criar_estrutura_organizacional(db):
"""Cria a estrutura organizacional básica"""
print("\nCriando estrutura organizacional...")
# Criar Comitê Central
cc = ComiteCentral(nome="Comitê Central SP")
db.add(cc)
db.flush()
# Criar Comitês Regionais
crs = []
for nome in ["CR São Paulo", "CR ABC", "CR Campinas"]:
cr = ComiteRegional(nome=nome)
db.add(cr)
db.flush()
crs.append(cr)
# Criar Setores para cada CR
setores = []
for cr in crs:
for i in range(2): # 2 setores por CR
setor = Setor(
nome=f"Setor {i+1} - {cr.nome}",
cr_id=cr.id
)
db.add(setor)
db.flush()
setores.append(setor)
# Criar Células para cada Setor
for setor in setores:
for i in range(2): # 2 células por setor
celula = Celula(
nome=f"Célula {i+1} - {setor.nome}",
setor_id=setor.id
)
db.add(celula)
db.commit()
return crs, setores
def criar_tipos_pagamento(db):
"""Cria tipos de pagamento padrão"""
print("\nCriando tipos de pagamento...")
tipos = [
"Dinheiro",
"PIX",
"Cartão de Crédito",
"Cartão de Débito",
"Transferência Bancária"
]
for tipo in tipos:
if not db.query(TipoPagamento).filter_by(descricao=tipo).first():
db.add(TipoPagamento(descricao=tipo))
db.commit()
def criar_tipos_material(db):
"""Cria tipos de material padrão"""
print("\nCriando tipos de material...")
tipos = [
"Jornal",
"Revista",
"Livro",
"Panfleto",
"Cartilha"
]
for tipo in tipos:
if not db.query(TipoMaterial).filter_by(descricao=tipo).first():
db.add(TipoMaterial(descricao=tipo))
db.commit()
def criar_militantes(db, num_militantes, setores):
"""Cria militantes com todos os dados necessários"""
print(f"\nCriando {num_militantes} militantes...")
militantes = []
emails_usados = set()
for i in range(num_militantes):
try:
# Dados básicos
nome = fake.name()
cpf = fake.cpf()
# Criar endereço
endereco = Endereco(
cep=fake.postcode(),
estado=fake.estado_sigla(),
cidade=fake.city(),
bairro=fake.bairro(),
rua=fake.street_name(),
numero=str(random.randint(1, 999)),
complemento=f"Bloco {random.randint(1, 10)}, Apto {random.randint(1, 999)}" if random.random() < 0.3 else None
)
db.add(endereco)
db.flush()
# Selecionar setor e célula aleatórios
setor = random.choice(setores)
celula = random.choice(db.query(Celula).filter_by(setor_id=setor.id).all())
# Definir responsabilidades
responsabilidades = 0
if random.random() < 0.2: # 20% chance de ser Responsável de Finanças
responsabilidades |= Militante.RESPONSAVEL_FINANCAS
if random.random() < 0.2: # 20% chance de ser Responsável de Imprensa
responsabilidades |= Militante.RESPONSAVEL_IMPRENSA
if random.random() < 0.2: # 20% chance de ser Quadro-Orientador
responsabilidades |= Militante.QUADRO_ORIENTADOR
if random.random() < 0.2: # 20% chance de ser Secretário
responsabilidades |= Militante.SECRETARIO
if random.random() < 0.2: # 20% chance de ser MPS
responsabilidades |= Militante.MPS
if random.random() < 0.2: # 20% chance de ser Tesoureiro
responsabilidades |= Militante.TESOUREIRO
if random.random() < 0.2: # 20% chance de ser MNS
responsabilidades |= Militante.MNS
if random.random() < 0.2: # 20% chance de ser da Juventude
responsabilidades |= Militante.JUVENTUDE
if random.random() < 0.3: # 30% chance de ser Aspirante
responsabilidades |= Militante.ASPIRANTE
print(f"Criando militante {i+1}: {nome}")
# Criar militante com todos os dados
militante = Militante(
nome=nome,
cpf=cpf,
titulo_eleitoral=str(random.randint(100000000000, 999999999999)),
data_nascimento=fake.date_of_birth(minimum_age=18, maximum_age=65),
data_entrada_oci=fake.date_between(start_date='-5y', end_date='today'),
data_efetivacao_oci=fake.date_between(start_date='-4y', end_date='today'),
telefone1=fake.phone_number(),
telefone2=fake.phone_number() if random.random() < 0.3 else None,
profissao=fake.job(),
regime_trabalho=random.choice(['CLT', 'PJ', 'Estatutário', 'Autônomo']),
empresa=fake.company(),
contratante=fake.company() if random.random() < 0.2 else None,
instituicao_ensino=fake.company() if random.random() < 0.4 else None,
tipo_instituicao=random.choice(['Federal', 'Estadual', 'Municipal', 'Privada']) if random.random() < 0.4 else None,
sindicato=fake.company() if random.random() < 0.6 else None,
cargo_sindical=random.choice(['Diretor', 'Delegado', 'Conselheiro']) if random.random() < 0.3 else None,
dirigente_sindical=random.random() < 0.2,
central_sindical=random.choice(['CUT', 'CSP-Conlutas', 'CTB', 'Força Sindical']) if random.random() < 0.4 else None,
endereco_id=endereco.id,
celula_id=celula.id,
responsabilidades=responsabilidades,
estado=random.choice(list(EstadoMilitante))
)
db.add(militante)
db.flush()
# Email único
while True:
email = fake.email()
if email not in emails_usados:
emails_usados.add(email)
break
# Criar email do militante
email_militante = EmailMilitante(
militante_id=militante.id,
endereco_email=email
)
db.add(email_militante)
militantes.append(militante)
db.commit()
except Exception as e:
print(f"Erro ao criar militante {i+1}: {e}")
db.rollback()
continue
return militantes
def criar_cotas(db, militantes):
"""Cria cotas mensais para os militantes"""
print("\nCriando cotas mensais...")
for militante in militantes:
try:
# Criar 12 cotas (1 ano) para cada militante
for i in range(12):
data_base = datetime.now() - timedelta(days=30 * i)
valor = random.uniform(50, 200)
cota = CotaMensal(
militante_id=militante.id,
valor_antigo=valor,
valor_novo=valor * 1.1,
data_alteracao=data_base,
data_vencimento=data_base + timedelta(days=30),
pago=random.choice([True, False])
)
db.add(cota)
db.commit()
except Exception as e:
print(f"Erro ao criar cotas para militante {militante.nome}: {e}")
db.rollback()
def criar_pagamentos(db, militantes):
"""Cria pagamentos para os militantes"""
print("\nCriando pagamentos...")
tipos_pagamento = db.query(TipoPagamento).all()
for militante in militantes:
try:
# Criar entre 3 e 8 pagamentos por militante
for _ in range(random.randint(3, 8)):
tipo = random.choice(tipos_pagamento)
pagamento = Pagamento(
militante_id=militante.id,
tipo_pagamento=tipo.descricao, # Usando a descrição do tipo
valor=random.uniform(50, 500),
data_pagamento=fake.date_between(start_date='-1y', end_date='today')
)
db.add(pagamento)
db.commit()
except Exception as e:
print(f"Erro ao criar pagamentos para militante {militante.nome}: {e}")
db.rollback()
def criar_materiais_vendidos(db, militantes):
"""Cria registros de materiais vendidos"""
print("\nCriando materiais vendidos...")
tipos_material = db.query(TipoMaterial).all()
for militante in militantes:
try:
# Criar entre 2 e 5 materiais vendidos por militante
for _ in range(random.randint(2, 5)):
material = MaterialVendido(
militante_id=militante.id,
tipo_material_id=random.choice(tipos_material).id,
descricao=fake.sentence(),
valor=random.uniform(20, 100),
data_venda=fake.date_time_between(start_date='-1y', end_date='now')
)
db.add(material)
db.commit()
except Exception as e:
print(f"Erro ao criar materiais vendidos para militante {militante.nome}: {e}")
db.rollback()
def criar_vendas_jornal(db, militantes):
"""Cria vendas de jornal avulso"""
print("\nCriando vendas de jornal...")
for militante in militantes:
try:
# Criar entre 2 e 6 vendas de jornal por militante
for _ in range(random.randint(2, 6)):
quantidade = random.randint(1, 10)
valor_unitario = random.uniform(5, 15)
venda = VendaJornalAvulso(
militante_id=militante.id,
quantidade=quantidade,
valor_total=quantidade * valor_unitario,
data_venda=fake.date_time_between(start_date='-1y', end_date='now')
)
db.add(venda)
db.commit()
except Exception as e:
print(f"Erro ao criar vendas de jornal para militante {militante.nome}: {e}")
db.rollback()
def criar_assinaturas(db, militantes):
"""Cria assinaturas anuais"""
print("\nCriando assinaturas anuais...")
tipos_material = db.query(TipoMaterial).all()
for militante in militantes:
try:
# 30% de chance de ter assinatura
if random.random() < 0.3:
data_inicio = fake.date_time_between(start_date='-1y', end_date='now')
assinatura = AssinaturaAnual(
militante_id=militante.id,
tipo_material_id=random.choice(tipos_material).id,
quantidade=random.randint(1, 3),
valor_total=random.uniform(100, 500),
data_inicio=data_inicio,
data_fim=data_inicio + timedelta(days=365)
)
db.add(assinatura)
db.commit()
except Exception as e:
print(f"Erro ao criar assinatura para militante {militante.nome}: {e}")
db.rollback()
def seed_database():
"""Função principal para popular o banco de dados"""
db = get_db_session()
try:
print("Iniciando população do banco de dados...")
# Criar tipos básicos
criar_tipos_pagamento(db)
criar_tipos_material(db)
# Criar estrutura organizacional
crs, setores = criar_estrutura_organizacional(db)
# Criar militantes (30 militantes para teste)
militantes = criar_militantes(db, 30, setores)
# Criar dados financeiros e materiais
criar_cotas(db, militantes)
criar_pagamentos(db, militantes)
criar_materiais_vendidos(db, militantes)
criar_vendas_jornal(db, militantes)
criar_assinaturas(db, militantes)
print("\nBanco de dados populado com sucesso!")
except Exception as e:
print(f"Erro durante a população do banco: {e}")
db.rollback()
finally:
db.close()
if __name__ == "__main__":
seed_database()