Files
controles/functions/database.py

780 lines
29 KiB
Python

from datetime import datetime, timedelta
from werkzeug.security import generate_password_hash, check_password_hash
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, Numeric, Date, Enum
from sqlalchemy.orm import sessionmaker, relationship, backref
import os
import pyotp
from pathlib import Path
from sqlalchemy.pool import NullPool
import secrets
from flask_mail import Message
from flask import url_for
import enum
from flask_login import UserMixin
from .rbac import Role, Permission, role_permissions, user_roles
from .base import Base, engine, Session
# Configurar caminho do banco de dados
db_dir = Path.home() / '.local' / 'share' / 'controles'
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / 'database.db'
SessionLocal = sessionmaker(bind=engine)
def get_db_connection():
"""
Retorna uma nova sessão do banco de dados SQLite e verifica timeout
"""
session = SessionLocal()
try:
# Verificar timeout para usuários logados
usuario_atual = session.query(Usuario).filter(
Usuario.ultimo_login.isnot(None),
Usuario.ultimo_logout.is_(None)
).first()
if usuario_atual and usuario_atual.check_session_timeout():
usuario_atual.logout()
session.commit()
raise Exception("Sessão expirada. Por favor, faça login novamente.")
return session
except Exception as e:
session.close()
raise e
def execute_query(query, params=None):
"""
Executa uma query usando SQLAlchemy
"""
session = get_db_connection()
try:
result = session.execute(query, params)
session.commit()
return result
except Exception as e:
session.rollback()
raise e
finally:
session.close()
class Celula(Base):
__tablename__ = 'celulas'
id = Column(Integer, primary_key=True, autoincrement=True)
nome = Column(String(100), nullable=False)
setor_id = Column(Integer, ForeignKey('setores.id'))
cr_id = Column(Integer, ForeignKey('comites_regionais.id'))
secretario = Column(Integer, ForeignKey('militantes.id'))
responsavel_financas = Column(Integer, ForeignKey('militantes.id'))
quadro_orientador = Column(String(255))
# Relacionamentos
setor = relationship("Setor", back_populates="celulas")
cr = relationship("ComiteRegional", back_populates="celulas")
militantes = relationship("Militante", back_populates="celula", foreign_keys="[Militante.celula_id]")
secretario_rel = relationship("Militante", foreign_keys=[secretario])
responsavel_financas_rel = relationship("Militante", foreign_keys=[responsavel_financas])
pagamentos = relationship("PagamentoCelula", back_populates="celula")
usuarios = relationship("Usuario", back_populates="celula")
class ComiteRegional(Base):
__tablename__ = 'comites_regionais'
id = Column(Integer, primary_key=True, autoincrement=True)
nome = Column(String(100), nullable=False)
responsavel_financas = Column(Integer, ForeignKey('militantes.id'))
responsavel_formacao = Column(Integer, ForeignKey('militantes.id'))
secretario_organizacao = Column(Integer, ForeignKey('militantes.id'))
correspondente_jornal = Column(Integer, ForeignKey('militantes.id'))
# Relacionamentos
responsavel_financas_rel = relationship("Militante", foreign_keys=[responsavel_financas])
responsavel_formacao_rel = relationship("Militante", foreign_keys=[responsavel_formacao])
secretario_organizacao_rel = relationship("Militante", foreign_keys=[secretario_organizacao])
correspondente_jornal_rel = relationship("Militante", foreign_keys=[correspondente_jornal])
setores = relationship("Setor", back_populates="cr")
celulas = relationship("Celula", back_populates="cr")
usuarios = relationship("Usuario", back_populates="cr")
class EmailMilitante(Base):
__tablename__ = 'emails_militantes'
id = Column(Integer, primary_key=True, autoincrement=True)
militante_id = Column(Integer, ForeignKey('militantes.id'))
endereco_email = Column(String(100))
militante = relationship("Militante", back_populates="emails")
class Endereco(Base):
__tablename__ = 'enderecos'
id = Column(Integer, primary_key=True, autoincrement=True)
estado = Column(String(2))
cidade = Column(String(50))
bairro = Column(String(50))
rua = Column(String(100))
numero = Column(String(10))
complemento = Column(String(50))
cep = Column(String(9))
militantes = relationship("Militante", back_populates="endereco")
class RedeSocial(Base):
__tablename__ = 'redes_sociais'
id = Column(Integer, primary_key=True, autoincrement=True)
militante_id = Column(Integer, ForeignKey('militantes.id'))
tipo = Column(String(20)) # Instagram, TikTok, Discord, etc.
identificador = Column(String(100))
militante = relationship("Militante", back_populates="redes_sociais")
class Militante(Base):
__tablename__ = 'militantes'
id = Column(Integer, primary_key=True, autoincrement=True)
nome = Column(String(100), nullable=False)
cpf = Column(String(14), unique=True)
# Novos campos básicos
titulo_eleitoral = Column(String(20))
data_nascimento = Column(Date)
data_entrada_oci = Column(Date)
data_efetivacao_oci = Column(Date)
# Campos de contato
telefone1 = Column(String(15))
telefone2 = Column(String(15))
# Relacionamento para múltiplos emails
emails = relationship("EmailMilitante", back_populates="militante")
# Endereço
endereco_id = Column(Integer, ForeignKey('enderecos.id'))
endereco = relationship("Endereco", back_populates="militantes")
# Redes sociais
redes_sociais = relationship("RedeSocial", back_populates="militante")
# Campos profissionais
profissao = Column(String(100))
regime_trabalho = Column(String(50)) # CLT, Estatutário, etc.
empresa = Column(String(100))
contratante = Column(String(100)) # Para terceirizados
# Campos acadêmicos
instituicao_ensino = Column(String(100))
tipo_instituicao = Column(String(20)) # Federal, Estadual, etc.
# Campos sindicais
sindicato = Column(String(100))
cargo_sindical = Column(String(50))
dirigente_sindical = Column(Boolean)
central_sindical = Column(String(100))
# Responsável pelo cadastro
registrado_por = Column(Integer, ForeignKey('militantes.id'))
# Campos existentes
celula_id = Column(Integer, ForeignKey('celulas.id'))
responsabilidades = Column(Integer, default=0)
otp_secret = Column(String(32))
temp_token = Column(String(64))
temp_token_expiry = Column(DateTime)
# Novo campo para Quadro-Orientador
quadro_orientador = Column(Boolean, default=False)
# Campos para Aspirante
aspirante = Column(Boolean, default=True) # Por padrão, todo novo militante é aspirante
data_inicio_aspirante = Column(DateTime, default=datetime.utcnow)
avaliacao_aspirante = Column(Text)
data_avaliacao_aspirante = Column(DateTime)
# Relacionamentos existentes
cotas_mensais = relationship("CotaMensal", back_populates="militante")
pagamentos = relationship("Pagamento", back_populates="militante")
materiais_vendidos = relationship("MaterialVendido", back_populates="militante")
vendas_jornais = relationship("VendaJornalAvulso", back_populates="militante")
assinaturas = relationship("AssinaturaAnual", back_populates="militante")
celula = relationship("Celula", back_populates="militantes", foreign_keys=[celula_id])
# Constantes para responsabilidades
SECRETARIO = 1
TESOUREIRO = 2
IMPRENSA = 4
MNS = 8
MPS = 16
JUVENTUDE = 32
QUADRO_ORIENTADOR = 64
ASPIRANTE = 128
RESPONSAVEL_FINANCAS = 256
RESPONSAVEL_IMPRENSA = 512
@staticmethod
def get_responsabilidades_list():
return [
(Militante.SECRETARIO, "Secretário"),
(Militante.TESOUREIRO, "Tesoureiro"),
(Militante.IMPRENSA, "Imprensa"),
(Militante.MNS, "MNS"),
(Militante.MPS, "MPS"),
(Militante.JUVENTUDE, "Juventude"),
(Militante.QUADRO_ORIENTADOR, "Quadro-Orientador"),
(Militante.ASPIRANTE, "Aspirante"),
(Militante.RESPONSAVEL_FINANCAS, "Responsável de Finanças"),
(Militante.RESPONSAVEL_IMPRENSA, "Responsável de Imprensa")
]
def set_responsabilidades(self, resp_list):
"""
Define as responsabilidades do militante
resp_list: lista de inteiros representando as responsabilidades
"""
self.responsabilidades = sum(resp_list)
def get_responsabilidades(self):
"""
Retorna lista de responsabilidades ativas
"""
resp = []
for valor, nome in self.get_responsabilidades_list():
if self.responsabilidades & valor:
resp.append(nome)
return resp
def generate_temp_token(self):
"""
Gera um token temporário para acesso ao QR code
"""
self.temp_token = secrets.token_urlsafe(32)
self.temp_token_expiry = datetime.now() + timedelta(hours=48)
return self.temp_token
def send_otp_email(self, mail):
"""
Envia email com link para QR code
"""
token = self.generate_temp_token()
qr_url = url_for('get_qr_code', token=token, _external=True)
msg = Message(
'Configuração de Autenticação em Duas Etapas',
recipients=[self.email]
)
msg.body = f"""
Olá {self.nome},
Para configurar sua autenticação em duas etapas, acesse o link abaixo:
{qr_url}
Este link expirará em 48 horas.
Instruções:
1. Instale um aplicativo autenticador (Google Authenticator, Microsoft Authenticator)
2. Acesse o link acima
3. Escaneie o QR code com o aplicativo
4. Use o código gerado para fazer login no sistema
Atenciosamente,
Sistema de Controles
"""
mail.send(msg)
def generate_username(self):
"""Gera um nome de usuário único baseado no primeiro nome e um código"""
from sqlalchemy import func
db = get_db_connection()
try:
# Pega o primeiro nome
primeiro_nome = self.nome.split()[0].lower()
# Conta quantos usuários já existem com esse prefixo
count = db.query(func.count(Usuario.id)).filter(
Usuario.username.like(f"{primeiro_nome}%")
).scalar()
# Gera o código (número sequencial)
codigo = str(count + 1).zfill(3)
return f"{primeiro_nome}{codigo}"
finally:
db.close()
class CotaMensal(Base):
__tablename__ = 'cotas_mensais'
id = Column(Integer, primary_key=True, autoincrement=True)
militante_id = Column(Integer, ForeignKey('militantes.id'))
valor_antigo = Column(Numeric(10, 2), nullable=False)
valor_novo = Column(Numeric(10, 2), nullable=False)
data_alteracao = Column(Date, nullable=False)
militante = relationship("Militante", back_populates="cotas_mensais")
class TipoPagamento(Base):
__tablename__ = 'tipos_pagamento'
id = Column(Integer, primary_key=True, autoincrement=True)
descricao = Column(String(100), nullable=False)
class Pagamento(Base):
__tablename__ = 'pagamentos'
id = Column(Integer, primary_key=True, autoincrement=True)
militante_id = Column(Integer, ForeignKey('militantes.id'))
tipo_pagamento = Column(String(50)) # Cota, Jornal, Assinatura, etc.
mes_referencia = Column(Date)
numero_jornal = Column(String(20))
numero_inicial_assinatura = Column(String(20))
numero_final_assinatura = Column(String(20))
campanha_financeira = Column(String(50))
valor = Column(Numeric(10, 2), nullable=False)
data_pagamento = Column(Date, nullable=False)
militante = relationship("Militante", back_populates="pagamentos")
transacoes_pix = relationship("TransacaoPIX", back_populates="pagamento")
class TipoMaterial(Base):
__tablename__ = 'tipos_materiais'
id = Column(Integer, primary_key=True, autoincrement=True)
descricao = Column(String(100), nullable=False)
materiais_vendidos = relationship("MaterialVendido", back_populates="tipo_material")
assinaturas = relationship("AssinaturaAnual", back_populates="tipo_material")
class MaterialVendido(Base):
__tablename__ = 'materiais_vendidos'
id = Column(Integer, primary_key=True, autoincrement=True)
militante_id = Column(Integer, ForeignKey('militantes.id'))
tipo_material_id = Column(Integer, ForeignKey('tipos_materiais.id'))
descricao = Column(String(255), nullable=False)
valor = Column(Numeric(10, 2), nullable=False)
data_venda = Column(Date, nullable=False)
militante = relationship("Militante", back_populates="materiais_vendidos")
tipo_material = relationship("TipoMaterial", back_populates="materiais_vendidos")
class VendaJornalAvulso(Base):
__tablename__ = 'vendas_jornais_avulsos'
id = Column(Integer, primary_key=True, autoincrement=True)
militante_id = Column(Integer, ForeignKey('militantes.id'))
quantidade = Column(Integer, nullable=False)
valor_total = Column(Numeric(10, 2), nullable=False)
data_venda = Column(Date, nullable=False)
militante = relationship("Militante", back_populates="vendas_jornais")
class AssinaturaAnual(Base):
__tablename__ = 'assinaturas_anuais'
id = Column(Integer, primary_key=True, autoincrement=True)
militante_id = Column(Integer, ForeignKey('militantes.id'))
tipo_material_id = Column(Integer, ForeignKey('tipos_materiais.id'))
quantidade = Column(Integer, nullable=False)
valor_total = Column(Numeric(10, 2), nullable=False)
data_inicio = Column(Date, nullable=False)
data_fim = Column(Date, nullable=False)
militante = relationship("Militante", back_populates="assinaturas")
tipo_material = relationship("TipoMaterial", back_populates="assinaturas")
class Setor(Base):
__tablename__ = 'setores'
id = Column(Integer, primary_key=True)
nome = Column(String(100), nullable=False)
cr_id = Column(Integer, ForeignKey('comites_regionais.id'))
responsavel = Column(Integer, ForeignKey('militantes.id'))
responsavel_financas = Column(Integer, ForeignKey('militantes.id'))
# Relacionamentos
cr = relationship("ComiteRegional", back_populates="setores")
responsavel_rel = relationship("Militante", foreign_keys=[responsavel])
responsavel_financas_rel = relationship("Militante", foreign_keys=[responsavel_financas])
usuarios = relationship("Usuario", back_populates="setor")
celulas = relationship("Celula", back_populates="setor")
relatorios_cotas = relationship("RelatorioCotasMensais", back_populates="setor")
relatorios_vendas = relationship("RelatorioVendasMateriais", back_populates="setor")
class ComiteCentral(Base):
__tablename__ = 'comites_centrais'
id = Column(Integer, primary_key=True, autoincrement=True)
nome = Column(String(100), nullable=False)
relatorios_cotas = relationship("RelatorioCotasMensais", back_populates="comite")
relatorios_vendas = relationship("RelatorioVendasMateriais", back_populates="comite")
class RelatorioCotasMensais(Base):
__tablename__ = 'relatorio_cotas_mensais'
id = Column(Integer, primary_key=True, autoincrement=True)
setor_id = Column(Integer, ForeignKey('setores.id'))
comite_id = Column(Integer, ForeignKey('comites_centrais.id'))
total_cotas = Column(Numeric(10, 2), nullable=False)
data_relatorio = Column(Date, nullable=False)
setor = relationship("Setor", back_populates="relatorios_cotas")
comite = relationship("ComiteCentral", back_populates="relatorios_cotas")
class RelatorioVendasMateriais(Base):
__tablename__ = 'relatorio_vendas_materiais'
id = Column(Integer, primary_key=True, autoincrement=True)
setor_id = Column(Integer, ForeignKey('setores.id'))
comite_id = Column(Integer, ForeignKey('comites_centrais.id'))
total_vendas = Column(Numeric(10, 2), nullable=False)
data_relatorio = Column(Date, nullable=False)
setor = relationship("Setor", back_populates="relatorios_vendas")
comite = relationship("ComiteCentral", back_populates="relatorios_vendas")
class TipoUsuario(enum.Enum):
ADMIN = "admin"
CR_RESPONSAVEL = "cr_responsavel"
SETOR_RESPONSAVEL = "setor_responsavel"
USUARIO = "usuario"
class Usuario(Base, UserMixin):
__tablename__ = 'usuarios'
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
email = Column(String(100), unique=True, nullable=False)
otp_secret = Column(String(32))
role_id = Column(Integer, ForeignKey('roles.id'))
setor_id = Column(Integer, ForeignKey('setores.id'))
ativo = Column(Boolean, default=True)
is_admin = Column(Boolean, default=False)
ultimo_login = Column(DateTime)
ultimo_logout = Column(DateTime)
motivo_logout = Column(String(100))
cr_id = Column(Integer, ForeignKey('comites_regionais.id'))
celula_id = Column(Integer, ForeignKey('celulas.id'))
session_timeout = Column(Integer, default=30)
tipo = Column(String(17), nullable=False)
ultima_atividade = Column(DateTime, default=datetime.utcnow)
# Relacionamento com militante
militante_id = Column(Integer, ForeignKey('militantes.id'))
militante = relationship("Militante", backref=backref("usuario", uselist=False))
# Relacionamentos
roles = relationship("Role", secondary="user_roles", back_populates="users")
setor = relationship('Setor', back_populates='usuarios')
cr = relationship('ComiteRegional', back_populates='usuarios')
celula = relationship('Celula', back_populates='usuarios')
def get_id(self):
return str(self.id)
@property
def is_authenticated(self):
return True
@property
def is_active(self):
return self.ativo
@property
def is_anonymous(self):
return False
def __init__(self, username, password, is_admin=False, email=None, tipo="USUARIO"):
self.username = username
self.password_hash = generate_password_hash(password)
self.is_admin = is_admin
self.email = email
self.ativo = True
self.session_timeout = 30
self.tipo = tipo
self.ultima_atividade = datetime.utcnow()
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def update_last_activity(self):
self.ultima_atividade = datetime.utcnow()
def is_session_expired(self):
if not self.ultima_atividade:
return True
time_diff = datetime.utcnow() - self.ultima_atividade
return time_diff.total_seconds() > (self.session_timeout * 60)
def check_session_timeout(self):
"""Verifica se a sessão do usuário expirou"""
if not self.ultima_atividade:
return True
time_diff = datetime.utcnow() - self.ultima_atividade
return time_diff.total_seconds() > (self.session_timeout * 60)
def has_permission(self, permission_name):
"""Verifica se o usuário tem uma determinada permissão"""
for role in self.roles:
for permission in role.permissions:
if permission.nome == permission_name:
return True
return False
def has_role(self, role_nivel):
"""Verifica se o usuário tem um determinado nível de role"""
for role in self.roles:
if role.nivel == role_nivel:
return True
return False
def get_otp_uri(self):
"""Gera a URI para autenticação em duas etapas"""
if not self.otp_secret:
self.otp_secret = pyotp.random_base32()
return pyotp.totp.TOTP(self.otp_secret).provisioning_uri(
self.username,
issuer_name="Sistema de Controles"
)
def verify_otp(self, code):
"""Verifica se um código OTP é válido"""
if not self.otp_secret:
print(f"Erro: OTP secret não configurado para o usuário {self.username}")
return False
print(f"Verificando OTP para usuário {self.username}")
print(f"OTP Secret: {self.otp_secret}")
print(f"Código fornecido: {code}")
totp = pyotp.totp.TOTP(self.otp_secret)
is_valid = totp.verify(code)
print(f"Resultado da verificação: {'Válido' if is_valid else 'Inválido'}")
print(f"Tempo atual: {datetime.utcnow()}")
print(f"Período atual: {totp.timecode(datetime.utcnow())}")
return is_valid
def logout(self):
"""Registra o logout do usuário"""
self.ultimo_logout = datetime.utcnow()
self.motivo_logout = "Logout manual"
self.ultima_atividade = None
class PagamentoCelula(Base):
__tablename__ = 'pagamentos_celula'
id = Column(Integer, primary_key=True, autoincrement=True)
celula_id = Column(Integer, ForeignKey('celulas.id'))
data = Column(Date)
valor = Column(Numeric(10, 2))
metodo_pagamento = Column(String(20)) # PIX, Dinheiro, etc.
codigo_pix = Column(String(100))
descricao = Column(String(255))
registrado_por = Column(Integer, ForeignKey('militantes.id'))
celula = relationship("Celula", back_populates="pagamentos")
registrado_por_rel = relationship("Militante", foreign_keys=[registrado_por])
class Atividade(Base):
__tablename__ = 'atividades'
id = Column(Integer, primary_key=True, autoincrement=True)
descricao = Column(String(255))
data = Column(Date)
responsavel1 = Column(Integer, ForeignKey('militantes.id'))
responsavel2 = Column(Integer, ForeignKey('militantes.id'))
responsavel1_rel = relationship("Militante", foreign_keys=[responsavel1])
responsavel2_rel = relationship("Militante", foreign_keys=[responsavel2])
materiais = relationship("MaterialAtividade", back_populates="atividade")
class MaterialAtividade(Base):
__tablename__ = 'materiais_atividades'
id = Column(Integer, primary_key=True, autoincrement=True)
atividade_id = Column(Integer, ForeignKey('atividades.id'))
tipo = Column(String(20)) # Jornal, Revista, etc.
quantidade = Column(Integer)
detalhes = Column(String(255))
atividade = relationship("Atividade", back_populates="materiais")
class Relatorio(Base):
__tablename__ = 'relatorios'
id = Column(Integer, primary_key=True, autoincrement=True)
tipo = Column(String(50)) # Semanal, Quinzenal, Mensal
periodo_inicio = Column(Date)
periodo_fim = Column(Date)
gerado_por = Column(Integer, ForeignKey('militantes.id'))
conteudo = Column(Text)
# Relacionamento hierárquico
celula_id = Column(Integer, ForeignKey('celulas.id'))
setor_id = Column(Integer, ForeignKey('setores.id'))
cr_id = Column(Integer, ForeignKey('comites_regionais.id'))
gerado_por_rel = relationship("Militante", foreign_keys=[gerado_por])
celula = relationship("Celula", foreign_keys=[celula_id])
setor = relationship("Setor", foreign_keys=[setor_id])
cr = relationship("ComiteRegional", foreign_keys=[cr_id])
class TransacaoPIX(Base):
__tablename__ = 'transacoes_pix'
id = Column(Integer, primary_key=True, autoincrement=True)
chave_pix = Column(String(100))
valor = Column(Numeric(10, 2))
data_geracao = Column(DateTime)
data_pagamento = Column(DateTime)
status = Column(String(20)) # Pendente, Pago, Expirado
qr_code = Column(Text)
pagamento_id = Column(Integer, ForeignKey('pagamentos.id'))
pagamento = relationship("Pagamento", back_populates="transacoes_pix")
# Remover o banco de dados existente (se existir)
if os.path.exists(db_path):
os.remove(db_path)
def init_rbac():
"""Inicializa o sistema RBAC"""
print("Inicializando sistema RBAC...")
session = SessionLocal()
try:
# Verificar se já existe um admin
admin = session.query(Usuario).filter_by(username="admin").first()
if not admin:
print("Criando role de administrador...")
# Criar role de admin
admin_role = session.query(Role).filter_by(nome="Administrador").first()
if not admin_role:
admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL)
session.add(admin_role)
session.commit()
print("Criando usuário admin...")
# Criar usuário admin
admin = Usuario(
username="admin",
password="admin123",
is_admin=True
)
admin.email = "admin@example.com"
admin.role_id = admin_role.id
# Adicionar apenas a permissão de system_config ao admin
permission = session.query(Permission).filter_by(nome='system_config').first()
if permission and permission not in admin_role.permissions:
admin_role.permissions.append(permission)
session.add(admin)
session.commit()
print("=== Usuário Admin Criado ===")
print(f"Username: admin")
print(f"Senha: admin123")
print(f"Email: {admin.email}")
print(f"OTP Secret: {admin.otp_secret}")
else:
print("Usuário admin já existe")
# Garantir que o admin tenha apenas a permissão de system_config
admin_role = session.query(Role).filter_by(nome="Administrador").first()
if admin_role:
# Remover todas as permissões atuais
admin_role.permissions = []
# Adicionar apenas a permissão de system_config
permission = session.query(Permission).filter_by(nome='system_config').first()
if permission:
admin_role.permissions.append(permission)
session.commit()
except Exception as e:
print(f"Erro na inicialização do sistema RBAC: {e}")
session.rollback()
raise
finally:
session.close()
def init_database():
"""Inicializa o banco de dados com dados básicos"""
print("Inicializando banco de dados...")
# Criar todas as tabelas
Base.metadata.drop_all(engine) # Remover todas as tabelas existentes
Base.metadata.create_all(engine)
session = SessionLocal()
try:
# Criar role de administrador
admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL)
session.add(admin_role)
session.commit()
# Verificar se existe um QR code salvo
qr_path = Path('admin_qr.png')
admin_otp_secret = None
if qr_path.exists():
# Extrair o segredo OTP do nome do arquivo temporário dentro do QR
try:
import re
with open('admin_qr.txt', 'r') as f:
qr_content = f.read()
# O segredo OTP está no formato otpauth://totp/admin?secret=XXXXX&issuer=Sistema%20de%20Controles
match = re.search(r'secret=([A-Z0-9]+)&', qr_content)
if match:
admin_otp_secret = match.group(1)
print(f"Usando OTP existente: {admin_otp_secret}")
except Exception as e:
print(f"Erro ao ler OTP existente: {e}")
if not admin_otp_secret:
admin_otp_secret = pyotp.random_base32()
print(f"Novo OTP gerado: {admin_otp_secret}")
# Criar usuário admin
admin = Usuario(
username="admin",
password="admin123",
is_admin=True,
email="admin@example.com",
tipo="ADMIN"
)
admin.role_id = admin_role.id
admin.otp_secret = admin_otp_secret
session.add(admin)
session.commit()
# Gerar novo QR code se não existir
if not qr_path.exists():
totp = pyotp.totp.TOTP(admin_otp_secret)
provisioning_uri = totp.provisioning_uri("admin", issuer_name="Sistema de Controles")
# Salvar a URI em um arquivo texto para referência futura
with open('admin_qr.txt', 'w') as f:
f.write(provisioning_uri)
# Gerar QR code
import qrcode
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img.save('admin_qr.png')
print("=== Usuário Admin Criado ===")
print(f"Username: admin")
print(f"Senha: admin123")
print(f"Email: {admin.email}")
print(f"OTP Secret: {admin.otp_secret}")
print(f"QR Code: {qr_path}")
except Exception as e:
print(f"Erro na inicialização do banco: {e}")
session.rollback()
raise
finally:
session.close()
# Inicializar o sistema RBAC
init_rbac()
# Inicializar o banco de dados automaticamente quando o módulo for importado
init_database()
# Executar a criação dos dados iniciais
if __name__ == "__main__":
init_database()