diff --git a/functions/base.py b/functions/base.py index 7be06e9..fa4d546 100644 --- a/functions/base.py +++ b/functions/base.py @@ -1,14 +1,26 @@ -from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker +from pathlib import Path import os -# Configuração do banco de dados -DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///database.db') -engine = create_engine(DATABASE_URL) -Session = sessionmaker(bind=engine) +# Configurar caminho do banco de dados +db_dir = Path.home() / '.local' / 'share' / 'controles' +db_dir.mkdir(parents=True, exist_ok=True) +db_path = db_dir / 'database.db' -# Base declarativa do SQLAlchemy +# Configurar SQLite com opções para melhor concorrência +engine = create_engine( + f'sqlite:///{db_path}', + connect_args={ + 'timeout': 30, # Tempo de espera em segundos + 'check_same_thread': False # Permite acesso de múltiplas threads + }, + pool_pre_ping=True, # Verifica conexão antes de usar + pool_recycle=3600 # Recicla conexões após 1 hora +) + +Session = sessionmaker(bind=engine) Base = declarative_base() def get_db_connection(): diff --git a/functions/database.py b/functions/database.py index ee76a07..77c5d6d 100644 --- a/functions/database.py +++ b/functions/database.py @@ -23,29 +23,15 @@ db_path = db_dir / 'database.db' SessionLocal = sessionmaker(bind=engine) def get_db_connection(): - """ - Retorna uma nova sessão do banco de dados SQLite e verifica timeout - """ - session = SessionLocal() + """Retorna uma nova conexão com o banco de dados""" + db = SessionLocal() try: - # Verificar timeout para usuários logados - usuario_atual = session.query(Usuario).filter( - Usuario.ultimo_login.isnot(None), - Usuario.ultimo_logout.is_(None) - ).first() - - if usuario_atual and usuario_atual.check_session_timeout(): - usuario_atual.logout() - session.commit() - raise Exception("Sessão expirada. Por favor, faça login novamente.") - - # Testa a conexão usando text() - session.execute(text("SELECT 1")) - return session - except Exception as e: - print(f"Erro ao conectar ao banco de dados: {str(e)}") - if session: - session.close() + # Configurar SQLite para melhor tratamento de concorrência + db.execute(text("PRAGMA journal_mode=WAL")) + db.execute(text("PRAGMA busy_timeout=5000")) + return db + except: + db.close() raise def execute_query(query, params=None): @@ -635,15 +621,43 @@ def init_database(): """Inicializa o banco de dados com dados básicos""" print("Inicializando banco de dados...") - # Criar todas as tabelas - Base.metadata.drop_all(engine) # Remover todas as tabelas existentes - Base.metadata.create_all(engine) - - session = SessionLocal() + session = get_db_connection() try: - # Criar role de administrador - admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL) - session.add(admin_role) + # Configurar SQLite para melhor tratamento de concorrência + session.execute(text("PRAGMA journal_mode=WAL")) + session.execute(text("PRAGMA busy_timeout=5000")) + + # Criar todas as tabelas + Base.metadata.drop_all(engine) # Remover todas as tabelas existentes + Base.metadata.create_all(engine) + + # Criar roles padrão + roles = [ + ("Administrador", Role.SECRETARIO_GERAL), + ("Secretário", Role.SECRETARIO_CELULA), + ("Militante", Role.MILITANTE_BASICO) + ] + + for nome, nivel in roles: + if not session.query(Role).filter_by(nome=nome).first(): + role = Role(nome=nome, nivel=nivel) + session.add(role) + session.commit() + + # Criar setores padrão + setores = ["Setor 1", "Setor 2", "Setor 3"] + for nome in setores: + if not session.query(Setor).filter_by(nome=nome).first(): + setor = Setor(nome=nome) + session.add(setor) + session.commit() + + # Criar comitês padrão + comites = ["Comitê 1", "Comitê 2", "Comitê 3"] + for nome in comites: + if not session.query(ComiteCentral).filter_by(nome=nome).first(): + comite = ComiteCentral(nome=nome) + session.add(comite) session.commit() # Verificar se existe um QR code salvo @@ -651,12 +665,10 @@ def init_database(): admin_otp_secret = None if qr_path.exists(): - # Extrair o segredo OTP do nome do arquivo temporário dentro do QR try: import re with open('admin_qr.txt', 'r') as f: qr_content = f.read() - # O segredo OTP está no formato otpauth://totp/admin?secret=XXXXX&issuer=Sistema%20de%20Controles match = re.search(r'secret=([A-Z0-9]+)&', qr_content) if match: admin_otp_secret = match.group(1) @@ -669,6 +681,9 @@ def init_database(): print(f"Novo OTP gerado: {admin_otp_secret}") # Criar usuário admin + admin_role = session.query(Role).filter_by(nome="Administrador").first() + setor = session.query(Setor).first() + admin = Usuario( username="admin", email="admin@example.com", @@ -676,8 +691,9 @@ def init_database(): ) admin.set_password("admin123") admin.tipo = "ADMIN" - admin.role_id = admin_role.id admin.otp_secret = admin_otp_secret + admin.roles.append(admin_role) + admin.setor = setor session.add(admin) session.commit() @@ -686,11 +702,9 @@ def init_database(): totp = pyotp.totp.TOTP(admin_otp_secret) provisioning_uri = totp.provisioning_uri("admin", issuer_name="Sistema de Controles") - # Salvar a URI em um arquivo texto para referência futura with open('admin_qr.txt', 'w') as f: f.write(provisioning_uri) - # Gerar QR code import qrcode qr = qrcode.QRCode(version=1, box_size=10, border=5) qr.add_data(provisioning_uri) @@ -705,6 +719,12 @@ def init_database(): print(f"OTP Secret: {admin.otp_secret}") print(f"QR Code: {qr_path}") + # Importar e executar o seed após criar todas as dependências + from seed_data import seed_database + print("\nPopulando banco de dados com dados de teste...") + seed_database() + print("Dados de teste criados com sucesso!") + except Exception as e: print(f"Erro na inicialização do banco: {e}") session.rollback()