fix: ajusta configurações do SQLite para melhor concorrência e tratamento de locks

This commit is contained in:
andersonid
2025-04-04 09:24:34 -03:00
parent c17a3eaa0f
commit 9bb62c81a7
2 changed files with 73 additions and 41 deletions

View File

@@ -1,14 +1,26 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from pathlib import Path
import os import os
# Configuração do banco de dados # Configurar caminho do banco de dados
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///database.db') db_dir = Path.home() / '.local' / 'share' / 'controles'
engine = create_engine(DATABASE_URL) db_dir.mkdir(parents=True, exist_ok=True)
Session = sessionmaker(bind=engine) db_path = db_dir / 'database.db'
# Base declarativa do SQLAlchemy # Configurar SQLite com opções para melhor concorrência
engine = create_engine(
f'sqlite:///{db_path}',
connect_args={
'timeout': 30, # Tempo de espera em segundos
'check_same_thread': False # Permite acesso de múltiplas threads
},
pool_pre_ping=True, # Verifica conexão antes de usar
pool_recycle=3600 # Recicla conexões após 1 hora
)
Session = sessionmaker(bind=engine)
Base = declarative_base() Base = declarative_base()
def get_db_connection(): def get_db_connection():

View File

@@ -23,29 +23,15 @@ db_path = db_dir / 'database.db'
SessionLocal = sessionmaker(bind=engine) SessionLocal = sessionmaker(bind=engine)
def get_db_connection(): def get_db_connection():
""" """Retorna uma nova conexão com o banco de dados"""
Retorna uma nova sessão do banco de dados SQLite e verifica timeout db = SessionLocal()
"""
session = SessionLocal()
try: try:
# Verificar timeout para usuários logados # Configurar SQLite para melhor tratamento de concorrência
usuario_atual = session.query(Usuario).filter( db.execute(text("PRAGMA journal_mode=WAL"))
Usuario.ultimo_login.isnot(None), db.execute(text("PRAGMA busy_timeout=5000"))
Usuario.ultimo_logout.is_(None) return db
).first() except:
db.close()
if usuario_atual and usuario_atual.check_session_timeout():
usuario_atual.logout()
session.commit()
raise Exception("Sessão expirada. Por favor, faça login novamente.")
# Testa a conexão usando text()
session.execute(text("SELECT 1"))
return session
except Exception as e:
print(f"Erro ao conectar ao banco de dados: {str(e)}")
if session:
session.close()
raise raise
def execute_query(query, params=None): def execute_query(query, params=None):
@@ -635,15 +621,43 @@ def init_database():
"""Inicializa o banco de dados com dados básicos""" """Inicializa o banco de dados com dados básicos"""
print("Inicializando banco de dados...") print("Inicializando banco de dados...")
session = get_db_connection()
try:
# Configurar SQLite para melhor tratamento de concorrência
session.execute(text("PRAGMA journal_mode=WAL"))
session.execute(text("PRAGMA busy_timeout=5000"))
# Criar todas as tabelas # Criar todas as tabelas
Base.metadata.drop_all(engine) # Remover todas as tabelas existentes Base.metadata.drop_all(engine) # Remover todas as tabelas existentes
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
session = SessionLocal() # Criar roles padrão
try: roles = [
# Criar role de administrador ("Administrador", Role.SECRETARIO_GERAL),
admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL) ("Secretário", Role.SECRETARIO_CELULA),
session.add(admin_role) ("Militante", Role.MILITANTE_BASICO)
]
for nome, nivel in roles:
if not session.query(Role).filter_by(nome=nome).first():
role = Role(nome=nome, nivel=nivel)
session.add(role)
session.commit()
# Criar setores padrão
setores = ["Setor 1", "Setor 2", "Setor 3"]
for nome in setores:
if not session.query(Setor).filter_by(nome=nome).first():
setor = Setor(nome=nome)
session.add(setor)
session.commit()
# Criar comitês padrão
comites = ["Comitê 1", "Comitê 2", "Comitê 3"]
for nome in comites:
if not session.query(ComiteCentral).filter_by(nome=nome).first():
comite = ComiteCentral(nome=nome)
session.add(comite)
session.commit() session.commit()
# Verificar se existe um QR code salvo # Verificar se existe um QR code salvo
@@ -651,12 +665,10 @@ def init_database():
admin_otp_secret = None admin_otp_secret = None
if qr_path.exists(): if qr_path.exists():
# Extrair o segredo OTP do nome do arquivo temporário dentro do QR
try: try:
import re import re
with open('admin_qr.txt', 'r') as f: with open('admin_qr.txt', 'r') as f:
qr_content = f.read() qr_content = f.read()
# O segredo OTP está no formato otpauth://totp/admin?secret=XXXXX&issuer=Sistema%20de%20Controles
match = re.search(r'secret=([A-Z0-9]+)&', qr_content) match = re.search(r'secret=([A-Z0-9]+)&', qr_content)
if match: if match:
admin_otp_secret = match.group(1) admin_otp_secret = match.group(1)
@@ -669,6 +681,9 @@ def init_database():
print(f"Novo OTP gerado: {admin_otp_secret}") print(f"Novo OTP gerado: {admin_otp_secret}")
# Criar usuário admin # Criar usuário admin
admin_role = session.query(Role).filter_by(nome="Administrador").first()
setor = session.query(Setor).first()
admin = Usuario( admin = Usuario(
username="admin", username="admin",
email="admin@example.com", email="admin@example.com",
@@ -676,8 +691,9 @@ def init_database():
) )
admin.set_password("admin123") admin.set_password("admin123")
admin.tipo = "ADMIN" admin.tipo = "ADMIN"
admin.role_id = admin_role.id
admin.otp_secret = admin_otp_secret admin.otp_secret = admin_otp_secret
admin.roles.append(admin_role)
admin.setor = setor
session.add(admin) session.add(admin)
session.commit() session.commit()
@@ -686,11 +702,9 @@ def init_database():
totp = pyotp.totp.TOTP(admin_otp_secret) totp = pyotp.totp.TOTP(admin_otp_secret)
provisioning_uri = totp.provisioning_uri("admin", issuer_name="Sistema de Controles") provisioning_uri = totp.provisioning_uri("admin", issuer_name="Sistema de Controles")
# Salvar a URI em um arquivo texto para referência futura
with open('admin_qr.txt', 'w') as f: with open('admin_qr.txt', 'w') as f:
f.write(provisioning_uri) f.write(provisioning_uri)
# Gerar QR code
import qrcode import qrcode
qr = qrcode.QRCode(version=1, box_size=10, border=5) qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri) qr.add_data(provisioning_uri)
@@ -705,6 +719,12 @@ def init_database():
print(f"OTP Secret: {admin.otp_secret}") print(f"OTP Secret: {admin.otp_secret}")
print(f"QR Code: {qr_path}") print(f"QR Code: {qr_path}")
# Importar e executar o seed após criar todas as dependências
from seed_data import seed_database
print("\nPopulando banco de dados com dados de teste...")
seed_database()
print("Dados de teste criados com sucesso!")
except Exception as e: except Exception as e:
print(f"Erro na inicialização do banco: {e}") print(f"Erro na inicialização do banco: {e}")
session.rollback() session.rollback()