203 lines
6.6 KiB
Python
203 lines
6.6 KiB
Python
|
|
from flask import request, jsonify, flash, session
|
||
|
|
from flask_login import current_user
|
||
|
|
from datetime import datetime
|
||
|
|
import secrets
|
||
|
|
import pyotp
|
||
|
|
|
||
|
|
from models.entities.usuario import Usuario
|
||
|
|
from services.usuario_service import UsuarioService
|
||
|
|
from services.database_service import DatabaseService
|
||
|
|
|
||
|
|
class UsuarioController:
|
||
|
|
"""Controlador para operações com usuários"""
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def listar_usuarios():
|
||
|
|
"""Lista todos os usuários do sistema"""
|
||
|
|
return UsuarioService.listar_usuarios()
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def buscar_usuario(user_id):
|
||
|
|
"""Busca um usuário pelo ID"""
|
||
|
|
return UsuarioService.buscar_usuario(user_id)
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def criar_usuario(data):
|
||
|
|
"""Cria um novo usuário"""
|
||
|
|
# Verificar campos obrigatórios
|
||
|
|
required_fields = ['username', 'password', 'email']
|
||
|
|
for field in required_fields:
|
||
|
|
if field not in data:
|
||
|
|
flash(f'Campo {field} é obrigatório.', 'danger')
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Verificar se usuário já existe
|
||
|
|
if UsuarioService.buscar_por_username(data['username']):
|
||
|
|
flash('Nome de usuário já existe.', 'danger')
|
||
|
|
return False
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Criar usuário
|
||
|
|
usuario = Usuario(
|
||
|
|
username=data['username'],
|
||
|
|
email=data['email'],
|
||
|
|
nome=data.get('nome'),
|
||
|
|
is_admin=data.get('is_admin', False)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Definir senha
|
||
|
|
usuario.set_password(data['password'])
|
||
|
|
|
||
|
|
# Gerar OTP secret
|
||
|
|
usuario.otp_secret = pyotp.random_base32()
|
||
|
|
|
||
|
|
# Definir outros campos
|
||
|
|
if 'role_id' in data:
|
||
|
|
usuario.role_id = data['role_id']
|
||
|
|
|
||
|
|
if 'setor_id' in data:
|
||
|
|
usuario.setor_id = data['setor_id']
|
||
|
|
|
||
|
|
# Salvar no banco
|
||
|
|
result = UsuarioService.salvar_usuario(usuario)
|
||
|
|
|
||
|
|
if result:
|
||
|
|
flash('Usuário cadastrado com sucesso!', 'success')
|
||
|
|
return True
|
||
|
|
else:
|
||
|
|
flash('Erro ao cadastrar usuário.', 'danger')
|
||
|
|
return False
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
flash(f'Erro ao cadastrar usuário: {str(e)}', 'danger')
|
||
|
|
return False
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def atualizar_usuario(user_id, data):
|
||
|
|
"""Atualiza um usuário existente"""
|
||
|
|
usuario = UsuarioService.buscar_usuario(user_id)
|
||
|
|
if not usuario:
|
||
|
|
flash('Usuário não encontrado.', 'danger')
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Atualizar campos
|
||
|
|
if 'email' in data:
|
||
|
|
usuario.email = data['email']
|
||
|
|
|
||
|
|
if 'nome' in data:
|
||
|
|
usuario.nome = data['nome']
|
||
|
|
|
||
|
|
if 'role_id' in data:
|
||
|
|
usuario.role_id = data['role_id']
|
||
|
|
|
||
|
|
if 'setor_id' in data:
|
||
|
|
usuario.setor_id = data['setor_id']
|
||
|
|
|
||
|
|
if 'is_admin' in data:
|
||
|
|
usuario.is_admin = data['is_admin']
|
||
|
|
|
||
|
|
if 'password' in data and data['password']:
|
||
|
|
usuario.set_password(data['password'])
|
||
|
|
|
||
|
|
# Salvar alterações
|
||
|
|
result = UsuarioService.salvar_usuario(usuario)
|
||
|
|
|
||
|
|
if result:
|
||
|
|
flash('Usuário atualizado com sucesso!', 'success')
|
||
|
|
return True
|
||
|
|
else:
|
||
|
|
flash('Erro ao atualizar usuário.', 'danger')
|
||
|
|
return False
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def toggle_status(user_id):
|
||
|
|
"""Ativa/desativa um usuário"""
|
||
|
|
if not current_user.is_admin:
|
||
|
|
return jsonify({
|
||
|
|
'success': False,
|
||
|
|
'error': 'Você não tem permissão para alterar o status de usuários.'
|
||
|
|
}), 403
|
||
|
|
|
||
|
|
usuario = UsuarioService.buscar_usuario(user_id)
|
||
|
|
if not usuario:
|
||
|
|
return jsonify({
|
||
|
|
'success': False,
|
||
|
|
'error': 'Usuário não encontrado.'
|
||
|
|
}), 404
|
||
|
|
|
||
|
|
usuario.ativo = not usuario.ativo
|
||
|
|
if UsuarioService.salvar_usuario(usuario):
|
||
|
|
return jsonify({
|
||
|
|
'success': True,
|
||
|
|
'message': f'Usuário {\'ativado\' if usuario.ativo else \'desativado\'}'
|
||
|
|
})
|
||
|
|
else:
|
||
|
|
return jsonify({
|
||
|
|
'success': False,
|
||
|
|
'error': 'Erro ao salvar alterações.'
|
||
|
|
}), 500
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def reset_password(user_id):
|
||
|
|
"""Reseta a senha de um usuário"""
|
||
|
|
usuario = UsuarioService.buscar_usuario(user_id)
|
||
|
|
if not usuario:
|
||
|
|
flash('Usuário não encontrado.', 'danger')
|
||
|
|
return False, None
|
||
|
|
|
||
|
|
# Gerar nova senha
|
||
|
|
new_password = secrets.token_urlsafe(8)
|
||
|
|
usuario.set_password(new_password)
|
||
|
|
|
||
|
|
# Salvar alterações
|
||
|
|
if UsuarioService.salvar_usuario(usuario):
|
||
|
|
return True, new_password
|
||
|
|
else:
|
||
|
|
flash('Erro ao resetar senha.', 'danger')
|
||
|
|
return False, None
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def reset_otp(user_id):
|
||
|
|
"""Reseta o OTP de um usuário"""
|
||
|
|
usuario = UsuarioService.buscar_usuario(user_id)
|
||
|
|
if not usuario:
|
||
|
|
flash('Usuário não encontrado.', 'danger')
|
||
|
|
return False
|
||
|
|
|
||
|
|
# Gerar novo OTP secret
|
||
|
|
usuario.otp_secret = pyotp.random_base32()
|
||
|
|
|
||
|
|
# Salvar alterações
|
||
|
|
if UsuarioService.salvar_usuario(usuario):
|
||
|
|
flash(f'OTP resetado com sucesso para {usuario.email}.', 'success')
|
||
|
|
return True
|
||
|
|
else:
|
||
|
|
flash('Erro ao resetar OTP.', 'danger')
|
||
|
|
return False
|
||
|
|
|
||
|
|
@staticmethod
|
||
|
|
def check_session():
|
||
|
|
"""Verifica status da sessão"""
|
||
|
|
if 'user_id' not in session:
|
||
|
|
return {'status': 'expired'}
|
||
|
|
|
||
|
|
if 'last_activity' in session:
|
||
|
|
last_activity = datetime.fromtimestamp(session['last_activity'])
|
||
|
|
now = datetime.now()
|
||
|
|
|
||
|
|
if now - last_activity > timedelta(hours=2):
|
||
|
|
# Registrar o logout por timeout
|
||
|
|
try:
|
||
|
|
user = UsuarioService.buscar_usuario(session.get('user_id'))
|
||
|
|
if user:
|
||
|
|
user.ultimo_logout = datetime.now()
|
||
|
|
user.motivo_logout = "Timeout de sessão"
|
||
|
|
UsuarioService.salvar_usuario(user)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Erro ao registrar logout por timeout: {e}")
|
||
|
|
|
||
|
|
session.clear()
|
||
|
|
return {'status': 'expired'}
|
||
|
|
|
||
|
|
return {'status': 'active'}
|