diff --git a/.gitignore b/.gitignore index 73baf91..79f62d2 100644 --- a/.gitignore +++ b/.gitignore @@ -260,5 +260,6 @@ poetry.toml pyrightconfig.json database.db +admin_qr.png -# End of https://www.toptal.com/developers/gitignore/api/python,flask \ No newline at end of file +# End of https://www.toptal.com/developers/gitignore/api/python,flask diff --git a/Makefile b/Makefile index bc6c28a..969466b 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,12 @@ install: pip install -r requirements.txt -run: +clean: + rm -rf ~/.local/share/controles/database.db + rm -f admin_qr.png + +run: clean python app.py + +reset-admin: clean + python create_admin.py diff --git a/README.md b/README.md index 62b96f3..c4a2e1c 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# controles +# Sistema de Controles ## Para instalar @@ -6,9 +6,37 @@ make install ``` -## Para executar +## Sobre o QR Code de Autenticação (admin_qr.png) +O arquivo `admin_qr.png` é um QR code gerado automaticamente para configuração da autenticação de dois fatores (2FA) do usuário administrador. Este arquivo é: + +- Gerado na raiz do projeto quando: + - O comando `make reset-admin` é executado + - O servidor é iniciado com `make run` e existe um usuário admin + - Um novo usuário é criado através da interface web + +- Usado para: + - Configurar a autenticação 2FA no aplicativo autenticador (Google Authenticator, Microsoft Authenticator, etc.) + - Gerar os códigos OTP necessários para fazer login no sistema + +### Importante: +- O QR code é atualizado sempre que um novo usuário é criado ou quando o sistema é reiniciado +- Cada QR code é único e corresponde ao segredo OTP atual do usuário +- Se você recriar o banco de dados ou resetar o admin, será necessário reconfigurar o aplicativo autenticador com o novo QR code +- Mantenha este arquivo seguro, pois ele contém informações sensíveis de autenticação + +### Como usar: +1. Instale um aplicativo autenticador no seu celular (Google Authenticator, Microsoft Authenticator, etc.) +2. Escaneie o QR code (`admin_qr.png`) com o aplicativo +3. O aplicativo irá gerar códigos de 6 dígitos a cada 30 segundos +4. Use estes códigos junto com seu usuário e senha para fazer login no sistema + +### Comandos relacionados: ```bash +# Limpar banco e criar novo admin com novo QR code +make reset-admin + +# Iniciar o servidor (também gera novo QR code se necessário) make run ``` diff --git a/app.py b/app.py index e97959a..29b8924 100644 --- a/app.py +++ b/app.py @@ -1,4 +1,4 @@ -from flask import Flask, request, render_template, redirect, url_for, flash +from flask import Flask, request, render_template, redirect, url_for, flash, session, jsonify from functions.database import ( Base, Militante, @@ -12,34 +12,174 @@ from functions.database import ( AssinaturaAnual, RelatorioCotasMensais, RelatorioVendasMateriais, - engine, + Usuario, + get_db_connection, ) -from sqlalchemy import create_engine, and_ +from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker -from datetime import datetime +from datetime import datetime, timedelta from flask_bootstrap import Bootstrap5 from routes.cota import cota_bp from functions.validations import validar_cpf - -Session = sessionmaker(bind=engine) -session = Session() +from functools import wraps +from pathlib import Path +from time import time +from create_admin import generate_qr_code app = Flask(__name__) -app.secret_key = 'sua_chave_secreta_aqui' +app.secret_key = 'sua_chave_secreta_aqui' # Necessário para sessões do Flask bootstrap = Bootstrap5(app) +# Configuração da sessão do SQLAlchemy +db_session = get_db_connection() -def session_run(model): - session.add(model) +# Decorator para verificar se o usuário está logado +def login_required(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if 'user_id' not in session: + flash('Por favor, faça login para acessar esta página.', 'warning') + return redirect(url_for('login')) + return f(*args, **kwargs) + return decorated_function + +# Decorator para verificar se a sessão expirou +def session_timeout(f): + @wraps(f) + def decorated_function(*args, **kwargs): + if 'user_id' not in session: + return redirect(url_for('login')) + + # Verificar se existe timestamp do último acesso + if 'last_activity' in session: + last_activity = datetime.fromtimestamp(session['last_activity']) + now = datetime.now() + + # Se passaram mais de 2 horas + if now - last_activity > timedelta(hours=2): + # Registrar o logout por timeout + try: + user = db_session.query(Usuario).get(session['user_id']) + if user: + user.ultimo_logout = datetime.now() + user.motivo_logout = "Timeout de sessão" + db_session.commit() + except Exception as e: + print(f"Erro ao registrar logout por timeout: {e}") + + session.clear() + flash('Sua sessão expirou. Por favor, faça login novamente.', 'warning') + return redirect(url_for('login')) + + # Atualizar timestamp de último acesso + session['last_activity'] = time() + return f(*args, **kwargs) + return decorated_function + +# Rota raiz - redireciona para login se não estiver autenticado +@app.route("/") +def index(): + if 'user_id' not in session: + return redirect(url_for('login')) + return redirect(url_for('home')) + +# Rota de login +@app.route("/login", methods=["GET", "POST"]) +def login(): + if 'user_id' in session: + return redirect(url_for('home')) + + if request.method == "POST": + username = request.form.get("username") + password = request.form.get("password") + otp = request.form.get("otp") + + # Log dos dados recebidos (sem a senha) + print(f"Tentativa de login - Username: {username}, OTP fornecido: {'Sim' if otp else 'Não'}") + + user = db_session.query(Usuario).filter_by(username=username).first() + + if not user: + print(f"Erro: Usuário '{username}' não encontrado") + flash('Usuário não encontrado', 'danger') + return render_template('login.html') + + if not user.check_password(password): + print(f"Erro: Senha incorreta para o usuário '{username}'") + flash('Senha incorreta', 'danger') + return render_template('login.html') + + if not user.verify_otp(otp): + print(f"Erro: Código OTP inválido para o usuário '{username}'") + print(f"OTP fornecido: {otp}") + print(f"OTP secret do usuário: {user.otp_secret}") + flash('Código OTP inválido', 'danger') + return render_template('login.html') + + # Se chegou aqui, login bem sucedido + print(f"Login bem sucedido para o usuário '{username}'") + session['user_id'] = user.id + session['is_admin'] = user.is_admin + session['last_activity'] = time() # Inicializar timestamp + + # Registrar horário do login + user.ultimo_login = datetime.now() + db_session.commit() + + next_page = request.args.get('next') + flash('Login realizado com sucesso!', 'success') + return redirect(next_page or url_for('home')) + + return render_template('login.html') + +# Rota de logout +@app.route("/logout") +def logout(): + session.clear() # Limpa a sessão do Flask + flash('Você foi desconectado com sucesso.', 'info') + return redirect(url_for('login')) + +# Rota home (protegida) +@app.route("/home") +@login_required +@session_timeout +def home(): + """Página inicial do sistema""" try: - session.commit() + links = [] + # Filtrar apenas as rotas que queremos mostrar + allowed_endpoints = { + 'listar_militantes': 'Militantes', + 'listar_cotas': 'Cotas', + 'listar_pagamentos': 'Pagamentos', + 'listar_materiais': 'Materiais', + 'listar_vendas_jornal': 'Vendas de Jornal', + 'listar_assinaturas': 'Assinaturas' + } + + for rule in app.url_map.iter_rules(): + if (rule.endpoint in allowed_endpoints and + "GET" in rule.methods and + len(rule.arguments) == 0): # Apenas rotas sem parâmetros + url = url_for(rule.endpoint) + nome = allowed_endpoints[rule.endpoint] + links.append((url, nome)) + + # Ordenar links pelo nome + links.sort(key=lambda x: x[1]) + + return render_template('home.html', links=links) except Exception as e: - print(e) - session.rollback() - + print(f"Erro na página inicial: {e}") + import traceback + traceback.print_exc() + flash('Erro ao carregar a página inicial', 'error') + return render_template('home.html', links=[]) # Rota para criar um novo militante @app.route("/militantes/novo", methods=["GET", "POST"]) +@login_required +@session_timeout def novo_militante(): if request.method == "POST": cpf = request.form["cpf"] @@ -58,22 +198,32 @@ def novo_militante(): filiado=bool(request.form.get("filiado", False)) ) - session_run(novo_militante) - flash('Militante cadastrado com sucesso!', 'success') - return redirect(url_for("listar_militantes")) + db_session.add(novo_militante) + try: + db_session.commit() + flash('Militante cadastrado com sucesso!', 'success') + return redirect(url_for("listar_militantes")) + except Exception as e: + print(e) + db_session.rollback() + flash('Erro ao cadastrar militante. Verifique se o CPF ou email já não estão cadastrados.', 'error') + return render_template("novo_militante.html", + dados_anteriores=request.form) return render_template("novo_militante.html") - # Rota para listar militantes @app.route("/militantes") +@login_required +@session_timeout def listar_militantes(): - militantes = session.query(Militante).all() + militantes = db_session.query(Militante).all() return render_template("listar_militantes.html", militantes=militantes) - # Rota para criar uma nova cota mensal @app.route("/cotas/novo", methods=["GET", "POST"]) +@login_required +@session_timeout def nova_cota(): if request.method == "POST": cotas_mensais = CotaMensal( @@ -83,21 +233,30 @@ def nova_cota(): data_alteracao=datetime.strptime(request.form["data_alteracao"], "%Y-%m-%d") ) - session_run(cotas_mensais) - return redirect(url_for("listar_cotas")) + db_session.add(cotas_mensais) + try: + db_session.commit() + return redirect(url_for("listar_cotas")) + except Exception as e: + print(e) + db_session.rollback() + flash('Erro ao cadastrar cota mensal. Verifique se os dados fornecidos são válidos.', 'error') + return render_template("nova_cota.html") return render_template("nova_cota.html") - # Rota para listar cotas mensais @app.route("/cotas") +@login_required +@session_timeout def listar_cotas(): - cotas = session.query(CotaMensal).all() + cotas = db_session.query(CotaMensal).all() return render_template("listar_cotas.html", cotas=cotas) - # Rota para criar um novo pagamento @app.route("/pagamentos/novo", methods=["GET", "POST"]) +@login_required +@session_timeout def novo_pagamento(): if request.method == "POST": pagamentos = Pagamento( @@ -107,21 +266,30 @@ def novo_pagamento(): data_pagamento=datetime.strptime(request.form["data_pagamento"], "%Y-%m-%d") ) - session_run(pagamentos) - return redirect(url_for("listar_pagamentos")) + db_session.add(pagamentos) + try: + db_session.commit() + return redirect(url_for("listar_pagamentos")) + except Exception as e: + print(e) + db_session.rollback() + flash('Erro ao cadastrar pagamento. Verifique se os dados fornecidos são válidos.', 'error') + return render_template("novo_pagamento.html") return render_template("novo_pagamento.html") - # Rota para listar pagamentos @app.route("/pagamentos") +@login_required +@session_timeout def listar_pagamentos(): - pagamentos = session.query(Pagamento).all() + pagamentos = db_session.query(Pagamento).all() return render_template("listar_pagamentos.html", pagamentos=pagamentos) - # Rota para criar um novo material vendido @app.route("/materiais/novo", methods=["GET", "POST"]) +@login_required +@session_timeout def novo_material(): if request.method == "POST": materiais_vendidos = MaterialVendido( @@ -132,21 +300,30 @@ def novo_material(): data_venda=datetime.strptime(request.form["data_venda"], "%Y-%m-%d"), ) - session_run(materiais_vendidos) - return redirect(url_for("listar_materiais")) + db_session.add(materiais_vendidos) + try: + db_session.commit() + return redirect(url_for("listar_materiais")) + except Exception as e: + print(e) + db_session.rollback() + flash('Erro ao cadastrar material vendido. Verifique se os dados fornecidos são válidos.', 'error') + return render_template("novo_material.html") return render_template("novo_material.html") - # Rota para listar materiais vendidos @app.route("/materiais") +@login_required +@session_timeout def listar_materiais(): - materiais = session.query(MaterialVendido).all() + materiais = db_session.query(MaterialVendido).all() return render_template("listar_materiais.html", materiais=materiais) - # Rota para criar uma nova venda de jornais avulsos @app.route("/jornais/novo", methods=["GET", "POST"]) +@login_required +@session_timeout def nova_venda_jornal(): if request.method == "POST": vendas_jornais_avulsos = VendaJornalAvulso( @@ -156,21 +333,30 @@ def nova_venda_jornal(): data_venda=datetime.strptime(request.form["data_venda"], "%Y-%m-%d"), ) - session_run(vendas_jornais_avulsos) - return redirect(url_for("listar_vendas_jornal")) + db_session.add(vendas_jornais_avulsos) + try: + db_session.commit() + return redirect(url_for("listar_vendas_jornal")) + except Exception as e: + print(e) + db_session.rollback() + flash('Erro ao cadastrar venda de jornal avulso. Verifique se os dados fornecidos são válidos.', 'error') + return render_template("nova_venda_jornal.html") return render_template("nova_venda_jornal.html") - # Rota para listar vendas de jornais avulsos @app.route("/jornais") +@login_required +@session_timeout def listar_vendas_jornal(): - vendas = session.query(VendaJornalAvulso).all() + vendas = db_session.query(VendaJornalAvulso).all() return render_template("listar_vendas_jornal.html", vendas=vendas) - # Rota para criar uma nova assinatura anual @app.route("/assinaturas/novo", methods=["GET", "POST"]) +@login_required +@session_timeout def nova_assinatura(): if request.method == "POST": assinaturas_anuais = AssinaturaAnual( @@ -182,21 +368,30 @@ def nova_assinatura(): data_fim=datetime.strptime(request.form["data_fim"], "%Y-%m-%d") ) - session_run(assinaturas_anuais) - return redirect(url_for("listar_assinaturas")) + db_session.add(assinaturas_anuais) + try: + db_session.commit() + return redirect(url_for("listar_assinaturas")) + except Exception as e: + print(e) + db_session.rollback() + flash('Erro ao cadastrar assinatura anual. Verifique se os dados fornecidos são válidos.', 'error') + return render_template("nova_assinatura.html") return render_template("nova_assinatura.html") - # Rota para listar assinaturas anuais @app.route("/assinaturas") +@login_required +@session_timeout def listar_assinaturas(): - assinaturas = session.query(AssinaturaAnual).all() + assinaturas = db_session.query(AssinaturaAnual).all() return render_template("listar_assinaturas.html", assinaturas=assinaturas) - # Rota para criar um novo relatório de cotas mensais @app.route("/relatorios/cotas/novo", methods=["GET", "POST"]) +@login_required +@session_timeout def novo_relatorio_cotas(): if request.method == "POST": relatorio_cotas_mensais = RelatorioCotasMensais( @@ -206,21 +401,30 @@ def novo_relatorio_cotas(): data_relatorio=datetime.strptime(request.form["data_relatorio"], "%Y-%m-%d") ) - session_run(relatorio_cotas_mensais) - return redirect(url_for("listar_relatorios_cotas")) + db_session.add(relatorio_cotas_mensais) + try: + db_session.commit() + return redirect(url_for("listar_relatorios_cotas")) + except Exception as e: + print(e) + db_session.rollback() + flash('Erro ao cadastrar relatório de cotas mensais. Verifique se os dados fornecidos são válidos.', 'error') + return render_template("novo_relatorio_cotas.html") return render_template("novo_relatorio_cotas.html") - # Rota para listar relatórios de cotas mensais @app.route("/relatorios/cotas") +@login_required +@session_timeout def listar_relatorios_cotas(): - relatorios = session.query(RelatorioCotasMensais).all() + relatorios = db_session.query(RelatorioCotasMensais).all() return render_template("listar_relatorios_cotas.html", relatorios=relatorios) - # Rota para criar um novo relatório de vendas de materiais @app.route("/relatorios/vendas/novo", methods=["GET", "POST"]) +@login_required +@session_timeout def novo_relatorio_vendas(): if request.method == "POST": relatorio_vendas_materiais = RelatorioVendasMateriais( @@ -230,42 +434,31 @@ def novo_relatorio_vendas(): data_relatorio=datetime.strptime(request.form["data_relatorio"], "%Y-%m-%d") ) - session_run(relatorio_vendas_materiais) - return redirect(url_for("listar_relatorios_vendas")) + db_session.add(relatorio_vendas_materiais) + try: + db_session.commit() + return redirect(url_for("listar_relatorios_vendas")) + except Exception as e: + print(e) + db_session.rollback() + flash('Erro ao cadastrar relatório de vendas de materiais. Verifique se os dados fornecidos são válidos.', 'error') + return render_template("novo_relatorio_vendas.html") return render_template("novo_relatorio_vendas.html") - # Rota para listar relatórios de vendas de materiais @app.route("/relatorios/vendas") +@login_required +@session_timeout def listar_relatorios_vendas(): - relatorios = session.query(RelatorioVendasMateriais).all() + relatorios = db_session.query(RelatorioVendasMateriais).all() return render_template("listar_relatorios_vendas.html", relatorios=relatorios) - -@app.route("/") -def home(): - """Página inicial do sistema""" - links = [] - for rule in app.url_map.iter_rules(): - if "GET" in rule.methods and has_no_empty_params(rule): - url = url_for(rule.endpoint, **(rule.defaults or {})) - # Substituindo 'home' por 'início' no menu - endpoint_name = 'Início' if rule.endpoint == 'home' else rule.endpoint - links.append((url, endpoint_name)) - - return render_template('home.html', links=links) - - -def has_no_empty_params(rule): - defaults = rule.defaults if rule.defaults is not None else () - arguments = rule.arguments if rule.arguments is not None else () - return len(defaults) >= len(arguments) - - @app.route("/militantes/editar/", methods=["GET", "POST"]) +@login_required +@session_timeout def editar_militante(id): - militante = session.query(Militante).get(id) + militante = db_session.query(Militante).get(id) if not militante: flash('Militante não encontrado.', 'error') return redirect(url_for('listar_militantes')) @@ -284,16 +477,95 @@ def editar_militante(id): militante.endereco = request.form["endereco"] militante.filiado = bool(request.form.get("filiado", False)) - session.commit() + db_session.commit() flash('Militante atualizado com sucesso!', 'success') return redirect(url_for('listar_militantes')) except Exception as e: - session.rollback() + db_session.rollback() flash('Erro ao atualizar militante. Verifique se o CPF ou email já não estão cadastrados.', 'error') return render_template("editar_militante.html", militante=militante) return render_template("editar_militante.html", militante=militante) +# Rota para criar novo usuário +@app.route("/usuarios/novo", methods=["GET", "POST"]) +@login_required +def novo_usuario(): + if not session.get('is_admin'): + flash('Acesso negado. Apenas administradores podem criar novos usuários.', 'danger') + return redirect(url_for('home')) + + if request.method == "POST": + username = request.form.get("username") + email = request.form.get("email") + password = request.form.get("password") + confirm_password = request.form.get("confirm_password") + is_admin = bool(request.form.get("is_admin")) + + # Validações + if password != confirm_password: + flash('As senhas não conferem.', 'danger') + return render_template('novo_usuario.html') + + # Verificar se usuário já existe + if db_session.query(Usuario).filter_by(username=username).first(): + flash('Nome de usuário já existe.', 'danger') + return render_template('novo_usuario.html') + + if db_session.query(Usuario).filter_by(email=email).first(): + flash('E-mail já cadastrado.', 'danger') + return render_template('novo_usuario.html') + + # Criar novo usuário + try: + novo_usuario = Usuario( + username=username, + password=password, + is_admin=is_admin + ) + novo_usuario.email = email + + db_session.add(novo_usuario) + db_session.commit() + + # Gerar QR code usando a função do create_admin.py + qr_uri = novo_usuario.get_otp_uri() + qr_path = generate_qr_code(novo_usuario) + + flash('Usuário criado com sucesso!', 'success') + return render_template('mostrar_qr_code.html', qr_uri=qr_uri) + + except Exception as e: + db_session.rollback() + flash('Erro ao criar usuário. Por favor, tente novamente.', 'danger') + print(f"Erro: {e}") + return render_template('novo_usuario.html') + + return render_template('novo_usuario.html') + +@app.route('/check_session') +def check_session(): + if 'last_activity' not in session: + return jsonify({'expired': True}) + + last_activity = datetime.fromtimestamp(session['last_activity']) + now = datetime.now() + + if now - last_activity > timedelta(hours=2): + # Registrar o logout por timeout + try: + user = db_session.query(Usuario).get(session.get('user_id')) + if user: + user.ultimo_logout = datetime.now() + user.motivo_logout = "Timeout de sessão" + db_session.commit() + except Exception as e: + print(f"Erro ao registrar logout por timeout: {e}") + + session.clear() + return jsonify({'expired': True}) + + return jsonify({'expired': False}) def create_app(): app = Flask(__name__) @@ -304,7 +576,12 @@ def create_app(): # ... existing code ... return app - # Iniciar o servidor Flask if __name__ == "__main__": + # Verificar se existe usuário admin e gerar QR code + admin = db_session.query(Usuario).filter_by(username="admin").first() + if admin: + print("\n=== Gerando QR Code para usuário admin existente ===") + generate_qr_code(admin) + app.run(debug=True) diff --git a/config.py b/config.py new file mode 100644 index 0000000..a9766b0 --- /dev/null +++ b/config.py @@ -0,0 +1 @@ +SECRET_KEY = 'sua_chave_secreta_aqui' # Use uma chave segura em produção \ No newline at end of file diff --git a/create_admin.py b/create_admin.py new file mode 100644 index 0000000..626221f --- /dev/null +++ b/create_admin.py @@ -0,0 +1,136 @@ +from functions.database import init_database, Usuario, get_db_connection +import qrcode +import os +from pathlib import Path +import pyotp + +def generate_qr_code(user): + """ + Gera o QR code para um usuário específico + + Args: + user: Instância do modelo Usuario + + Returns: + Path: Caminho do arquivo QR code gerado + """ + import qrcode + + # Gerar QR Code apenas na raiz do projeto + qr_path = Path('admin_qr.png') + + # Remover arquivo antigo se existir + if qr_path.exists(): + os.remove(str(qr_path)) + + # Gerar e salvar QR Code + qr = qrcode.QRCode(version=1, box_size=10, border=5) + qr.add_data(user.get_otp_uri()) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white") + img.save(str(qr_path)) + + print(f"\nQR Code gerado em: {os.path.abspath(qr_path)}") + + return qr_path + +def create_admin_user(): + try: + # Inicializar o banco de dados + init_database() + + # Obter a sessão + db_session = get_db_connection() + + # Verificar se o admin foi criado + admin = db_session.query(Usuario).filter_by(username="admin").first() + + if admin: + print("\n=== Usuário Admin Encontrado ===") + # Atualizar QR code mesmo se o usuário já existir + qr_path = generate_qr_code(admin) + else: + print("\n=== Criando Novo Usuário Admin ===") + # Criar usuário admin com novo segredo OTP + admin = Usuario( + username="admin", + password="admin123", + is_admin=True + ) + admin.email = "admin@example.com" + + # Adicionar e fazer commit para obter o ID + db_session.add(admin) + db_session.commit() + + # Recarregar o usuário do banco para garantir dados atualizados + admin = db_session.query(Usuario).filter_by(username="admin").first() + + # Verificar e mostrar informações do OTP + print("\n=== Informações do Usuário Admin ===") + print(f"Username: admin") + print(f"Senha: admin123") + print(f"Email: {admin.email}") + print(f"Segredo OTP atual: {admin.otp_secret}") + + # Gerar URI do OTP usando o segredo armazenado + totp = pyotp.TOTP(admin.otp_secret) + otp_uri = totp.provisioning_uri( + name=admin.username, + issuer_name="Sistema de Controles" + ) + + # Usar a função extraída para gerar o QR code + qr_path = generate_qr_code(admin) + + print("\n=== QR Code Gerado ===") + print(f"QR Code salvo em: {qr_path}") + print(f"URI do OTP: {otp_uri}") + + # Gerar código atual para verificação + current_code = totp.now() + print("\n=== Verificação do OTP ===") + print(f"Código OTP atual: {current_code}") + print(f"Verificação do código: {totp.verify(current_code)}") + + print("\n=== Instruções para Configuração ===") + print("1. Instale um aplicativo autenticador no seu celular") + print(" (Google Authenticator, Microsoft Authenticator, etc)") + print("2. Abra o aplicativo") + print("3. Selecione a opção para adicionar uma nova conta") + print("4. Escaneie o QR Code salvo em:", qr_path) + print("\nOU configure manualmente:") + print(f"- Nome da conta: {admin.username}") + print(f"- Segredo: {admin.otp_secret}") + print("- Tipo: Baseado em tempo (TOTP)") + print("- Algoritmo: SHA1") + print("- Dígitos: 6") + print("- Intervalo: 30 segundos") + + # Verificação final + print("\n=== Teste de Verificação ===") + test_code = totp.now() + print(f"Código de teste: {test_code}") + is_valid = admin.verify_otp(test_code) + print(f"Verificação do código: {'Sucesso' if is_valid else 'Falha'}") + + if not is_valid: + print("\nALERTA: Verificação do OTP falhou!") + print("Por favor, verifique se o segredo OTP está correto.") + + except Exception as e: + print(f"\nErro durante a execução: {e}") + import traceback + traceback.print_exc() + finally: + if 'db_session' in locals(): + db_session.close() + +if __name__ == "__main__": + # Remover banco de dados existente para começar limpo + db_path = Path.home() / '.local' / 'share' / 'controles' / 'database.db' + if db_path.exists(): + os.remove(db_path) + print("Banco de dados antigo removido.") + + create_admin_user() \ No newline at end of file diff --git a/functions/database.py b/functions/database.py index 50bf650..d3b16d4 100644 --- a/functions/database.py +++ b/functions/database.py @@ -1,16 +1,35 @@ -from sqlalchemy import create_engine, Column, Integer, String, Boolean, Numeric, Date, ForeignKey +from sqlalchemy import create_engine, Column, Integer, String, Boolean, Numeric, Date, ForeignKey, DateTime from sqlalchemy.orm import relationship, sessionmaker from sqlalchemy.ext.declarative import declarative_base +from werkzeug.security import generate_password_hash, check_password_hash +import pyotp +import os +from pathlib import Path +from sqlalchemy.pool import NullPool + +# Configurar caminho do banco de dados +db_dir = Path.home() / '.local' / 'share' / 'controles' +db_dir.mkdir(parents=True, exist_ok=True) +db_path = db_dir / 'database.db' + +# Configurar engine com NullPool +engine = create_engine( + f'sqlite:///{db_path}', + echo=True, + poolclass=NullPool # Usar NullPool ao invés do pool padrão +) Base = declarative_base() -engine = create_engine('sqlite:///database.db', echo=True) SessionLocal = sessionmaker(bind=engine) def get_db_connection(): """ Retorna uma nova sessão do banco de dados """ - return SessionLocal() + try: + return SessionLocal() + finally: + engine.dispose() def execute_query(query, params=None): """ @@ -130,6 +149,7 @@ class Setor(Base): relatorios_cotas = relationship("RelatorioCotasMensais", back_populates="setor") relatorios_vendas = relationship("RelatorioVendasMateriais", back_populates="setor") + usuarios = relationship("Usuario", back_populates="setor") class ComiteCentral(Base): __tablename__ = 'comites_centrais' @@ -164,4 +184,144 @@ class RelatorioVendasMateriais(Base): setor = relationship("Setor", back_populates="relatorios_vendas") comite = relationship("ComiteCentral", back_populates="relatorios_vendas") -Base.metadata.create_all(engine) \ No newline at end of file +class Usuario(Base): + __tablename__ = 'usuarios' + + id = Column(Integer, primary_key=True, autoincrement=True) + username = Column(String(50), unique=True, nullable=False) + password_hash = Column(String(255), nullable=False) + email = Column(String(100), unique=True, nullable=False) + otp_secret = Column(String(32)) + role_id = Column(Integer, ForeignKey('roles.id')) + setor_id = Column(Integer, ForeignKey('setores.id')) + ativo = Column(Boolean, default=True) + is_admin = Column(Boolean, default=False) + ultimo_login = Column(DateTime) + ultimo_logout = Column(DateTime) + motivo_logout = Column(String(100)) + + role = relationship("Role", back_populates="usuarios") + setor = relationship("Setor", back_populates="usuarios") + + def __init__(self, username, password, is_admin=False): + self.username = username + self.password_hash = generate_password_hash(password) + self.is_admin = is_admin + self.otp_secret = pyotp.random_base32() # Gerar segredo OTP na criação + self.ativo = True + + def check_password(self, password): + return check_password_hash(self.password_hash, password) + + def verify_otp(self, otp_code): + """Verifica se o código OTP fornecido é válido""" + if not self.otp_secret: + print(f"Erro: Usuário {self.username} não tem segredo OTP configurado") + return False + + totp = pyotp.TOTP(self.otp_secret) + is_valid = totp.verify(otp_code) + print(f"Verificando OTP para {self.username}") + print(f"Segredo: {self.otp_secret}") + print(f"Código fornecido: {otp_code}") + print(f"Resultado: {'válido' if is_valid else 'inválido'}") + return is_valid + + def get_otp_uri(self): + """Gera a URI para o QR code do OTP""" + if not self.otp_secret: + self.otp_secret = pyotp.random_base32() + + totp = pyotp.TOTP(self.otp_secret) + return totp.provisioning_uri( + name=self.username, + issuer_name="Sistema de Controles" + ) + +class Role(Base): + __tablename__ = 'roles' + + id = Column(Integer, primary_key=True, autoincrement=True) + nome = Column(String(50), unique=True, nullable=False) + nivel = Column(Integer, nullable=False) # Nível hierárquico (1: admin, 2: coordenador, 3: militante) + + usuarios = relationship("Usuario", back_populates="role") + permissoes = relationship("RolePermissao", back_populates="role") + +class Permissao(Base): + __tablename__ = 'permissoes' + + id = Column(Integer, primary_key=True, autoincrement=True) + nome = Column(String(50), unique=True, nullable=False) + descricao = Column(String(255)) + + roles = relationship("RolePermissao", back_populates="permissao") + +class RolePermissao(Base): + __tablename__ = 'roles_permissoes' + + role_id = Column(Integer, ForeignKey('roles.id'), primary_key=True) + permissao_id = Column(Integer, ForeignKey('permissoes.id'), primary_key=True) + + role = relationship("Role", back_populates="permissoes") + permissao = relationship("Permissao", back_populates="roles") + +# Remover o banco de dados existente (se existir) +if os.path.exists(db_path): + os.remove(db_path) + +def init_database(): + """Inicializa o banco de dados com dados básicos""" + print("Inicializando banco de dados...") + + # Criar todas as tabelas + Base.metadata.create_all(engine) + + session = SessionLocal() + try: + # Verificar se já existe um admin + admin = session.query(Usuario).filter_by(username="admin").first() + + if not admin: + print("Criando role de administrador...") + # Criar role de admin + admin_role = session.query(Role).filter_by(nome="Administrador").first() + if not admin_role: + admin_role = Role(nome="Administrador", nivel=1) + session.add(admin_role) + session.commit() + + print("Criando usuário admin...") + # Criar usuário admin + admin = Usuario( + username="admin", + password="admin123", + is_admin=True + ) + admin.email = "admin@example.com" + admin.role_id = admin_role.id + + session.add(admin) + session.commit() + + print("=== Usuário Admin Criado ===") + print(f"Username: admin") + print(f"Senha: admin123") + print(f"Email: {admin.email}") + print(f"OTP Secret: {admin.otp_secret}") + else: + print("Usuário admin já existe") + + except Exception as e: + print(f"Erro na inicialização do banco: {e}") + session.rollback() + raise + finally: + session.close() + +# Inicializar o banco de dados automaticamente quando o módulo for importado +init_database() + +# Executar a criação dos dados iniciais +if __name__ == "__main__": + init_database() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 3b1bd8a..299e71c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,20 +1,14 @@ -black==24.10.0 -blinker==1.9.0 -click==8.1.7 -Flask==3.1.0 -greenlet==3.1.1 -importlib_metadata==8.5.0 -itsdangerous==2.2.0 -Jinja2==3.1.4 -MarkupSafe==3.0.2 -mypy-extensions==1.0.0 -mysql-connector-python==9.1.0 -packaging==24.2 -pathspec==0.12.1 -platformdirs==4.3.6 -SQLAlchemy==2.0.36 -tomli==2.2.1 -typing_extensions==4.12.2 -Werkzeug==3.1.3 -zipp==3.21.0 +Flask==3.0.2 +Flask-SQLAlchemy==3.1.1 +SQLAlchemy==2.0.39 +Werkzeug==3.0.1 +pyotp==2.9.0 +qrcode==7.4.2 +pillow==11.0.0 +python-dotenv==1.0.1 +flask-login==0.6.3 +flask-wtf==1.2.1 +email-validator==2.1.0.post1 Bootstrap-Flask==2.4.1 +flask-bootstrap5==0.1.dev1 + diff --git a/scripts/init_db.py b/scripts/init_db.py new file mode 100644 index 0000000..774bcf7 --- /dev/null +++ b/scripts/init_db.py @@ -0,0 +1,27 @@ +from functions.database import Role, Permissao, RolePermissao, Base, engine +from sqlalchemy.orm import Session + +def init_db(): + Base.metadata.create_all(engine) + + with Session(engine) as session: + # Criar roles + admin = Role(nome='Administrador', nivel=1) + coord = Role(nome='Coordenador', nivel=2) + milit = Role(nome='Militante', nivel=3) + + # Criar permissões + perm_admin = Permissao(nome='admin', descricao='Acesso total') + perm_militantes = Permissao(nome='ver_militantes', descricao='Ver militantes') + # ... outras permissões ... + + session.add_all([admin, coord, milit, perm_admin, perm_militantes]) + session.commit() + + # Associar permissões aos roles + session.add(RolePermissao(role=admin, permissao=perm_admin)) + session.add(RolePermissao(role=coord, permissao=perm_militantes)) + session.commit() + +if __name__ == '__main__': + init_db() \ No newline at end of file diff --git a/templates/base.html b/templates/base.html index d2f91cc..35dc81a 100644 --- a/templates/base.html +++ b/templates/base.html @@ -7,6 +7,7 @@ {{ bootstrap.load_css() }} + {% if 'user_id' in session %} + {% endif %}
{% block content %}{% endblock %}
{{ bootstrap.load_js() }} + + + + {% if 'user_id' in session %} + + {% endif %} \ No newline at end of file diff --git a/templates/home.html b/templates/home.html index 31779b9..78bbb56 100644 --- a/templates/home.html +++ b/templates/home.html @@ -3,15 +3,26 @@ {% block title %}Início{% endblock %} {% block content %} -
-
-

Menu do Sistema

-
- {% for url, endpoint in links %} - - {{ endpoint|replace('_', ' ')|title }} - - {% endfor %} +
+
+
+

Menu do Sistema

+ + {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} +
{{ message }}
+ {% endfor %} + {% endif %} + {% endwith %} + +
+ {% for url, nome in links %} + + {{ nome }} + + {% endfor %} +
diff --git a/templates/login.html b/templates/login.html new file mode 100644 index 0000000..3a40097 --- /dev/null +++ b/templates/login.html @@ -0,0 +1,53 @@ +{% extends 'base.html' %} + +{% block title %}Login{% endblock %} + +{% block content %} +
+
+
+
+
+

Login

+
+
+ {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} +
+ {{ message }} + {% if category == 'danger' %} +
+ Se o problema persistir, contate o administrador. + {% endif %} +
+ {% endfor %} + {% endif %} + {% endwith %} + +
+
+ + +
+
+ + +
+
+ + + Digite o código de 6 dígitos do seu aplicativo autenticador +
+
+ +
+
+
+
+
+
+
+{% endblock %} \ No newline at end of file diff --git a/templates/mostrar_qr_code.html b/templates/mostrar_qr_code.html new file mode 100644 index 0000000..7dadae4 --- /dev/null +++ b/templates/mostrar_qr_code.html @@ -0,0 +1,41 @@ +{% extends 'base.html' %} + +{% block title %}Configurar Autenticação em Dois Fatores{% endblock %} + +{% block content %} +
+
+
+
+
+

Configure a Autenticação em Dois Fatores

+
+
+

Siga os passos abaixo para configurar a autenticação em dois fatores:

+ +
    +
  1. Instale um aplicativo autenticador no seu celular (Google Authenticator, Microsoft Authenticator, etc)
  2. +
  3. Abra o aplicativo e escaneie o QR Code abaixo
  4. +
  5. O aplicativo irá gerar um código de 6 dígitos a cada 30 segundos
  6. +
  7. Use este código ao fazer login no sistema
  8. +
+ +
+ QR Code para OTP +
+ +
+ Importante: Guarde este QR Code em um lugar seguro. + Você precisará dele caso troque de celular ou reinstale o aplicativo autenticador. +
+ + +
+
+
+
+
+{% endblock %} \ No newline at end of file diff --git a/templates/novo_usuario.html b/templates/novo_usuario.html new file mode 100644 index 0000000..f3ad9d3 --- /dev/null +++ b/templates/novo_usuario.html @@ -0,0 +1,58 @@ +{% extends 'base.html' %} + +{% block title %}Novo Usuário{% endblock %} + +{% block content %} +
+
+
+
+
+

Cadastro de Novo Usuário

+
+
+ {% with messages = get_flashed_messages(with_categories=true) %} + {% if messages %} + {% for category, message in messages %} +
{{ message }}
+ {% endfor %} + {% endif %} + {% endwith %} + +
+
+ + +
+ +
+ + +
+ +
+ + +
+ +
+ + +
+ + {% if session.get('is_admin') %} +
+ + +
+ {% endif %} + + + Voltar +
+
+
+
+
+
+{% endblock %} \ No newline at end of file