feat: implementa sistema de responsabilidades e instâncias - Adiciona responsabilidades de Finanças e Imprensa para todas as instâncias - Cria templates genéricos para gerenciamento de instâncias - Implementa sistema de permissões baseado em RBAC - Adiciona status de Aspirante com avaliação obrigatória - Atualiza documentação com novas regras e responsabilidades - Cria testes para validação das permissões - Adiciona migração para novos campos no banco de dados

This commit is contained in:
LS
2025-04-03 15:58:07 -03:00
parent 8dac8dc234
commit cbaf227e58
37 changed files with 4305 additions and 953 deletions

21
functions/base.py Normal file
View File

@@ -0,0 +1,21 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import os
# Configuração do banco de dados
DATABASE_URL = os.getenv('DATABASE_URL', 'sqlite:///database.db')
engine = create_engine(DATABASE_URL)
Session = sessionmaker(bind=engine)
# Base declarativa do SQLAlchemy
Base = declarative_base()
def get_db_connection():
"""Retorna uma nova sessão do banco de dados"""
session = Session()
try:
return session
except Exception as e:
session.rollback()
raise e

View File

@@ -1,39 +1,47 @@
from sqlalchemy import create_engine, Column, Integer, String, Boolean, Numeric, Date, ForeignKey, DateTime, Text
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime, timedelta
from werkzeug.security import generate_password_hash, check_password_hash
import pyotp
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Text, Numeric, Date, Enum
from sqlalchemy.orm import sessionmaker, relationship, backref
import os
import pyotp
from pathlib import Path
from sqlalchemy.pool import NullPool
from datetime import datetime, timedelta
import secrets
from flask_mail import Message
from flask import url_for
import enum
from flask_login import UserMixin
from .rbac import Role, Permission, role_permissions, user_roles
from .base import Base, engine, Session
# Configurar caminho do banco de dados
db_dir = Path.home() / '.local' / 'share' / 'controles'
db_dir.mkdir(parents=True, exist_ok=True)
db_path = db_dir / 'database.db'
# Configurar engine com NullPool
engine = create_engine(
f'sqlite:///{db_path}',
echo=True,
poolclass=NullPool # Usar NullPool ao invés do pool padrão
)
Base = declarative_base()
SessionLocal = sessionmaker(bind=engine)
def get_db_connection():
"""
Retorna uma nova sessão do banco de dados SQLite
Retorna uma nova sessão do banco de dados SQLite e verifica timeout
"""
session = SessionLocal()
try:
return SessionLocal()
finally:
engine.dispose()
# Verificar timeout para usuários logados
usuario_atual = session.query(Usuario).filter(
Usuario.ultimo_login.isnot(None),
Usuario.ultimo_logout.is_(None)
).first()
if usuario_atual and usuario_atual.check_session_timeout():
usuario_atual.logout()
session.commit()
raise Exception("Sessão expirada. Por favor, faça login novamente.")
return session
except Exception as e:
session.close()
raise e
def execute_query(query, params=None):
"""
@@ -68,6 +76,7 @@ class Celula(Base):
secretario_rel = relationship("Militante", foreign_keys=[secretario])
responsavel_financas_rel = relationship("Militante", foreign_keys=[responsavel_financas])
pagamentos = relationship("PagamentoCelula", back_populates="celula")
usuarios = relationship("Usuario", back_populates="celula")
class ComiteRegional(Base):
__tablename__ = 'comites_regionais'
@@ -86,6 +95,7 @@ class ComiteRegional(Base):
correspondente_jornal_rel = relationship("Militante", foreign_keys=[correspondente_jornal])
setores = relationship("Setor", back_populates="cr")
celulas = relationship("Celula", back_populates="cr")
usuarios = relationship("Usuario", back_populates="cr")
class EmailMilitante(Base):
__tablename__ = 'emails_militantes'
@@ -159,6 +169,13 @@ class Militante(Base):
otp_secret = Column(String(32))
temp_token = Column(String(64))
temp_token_expiry = Column(DateTime)
# Novo campo para Quadro-Orientador
quadro_orientador = Column(Boolean, default=False)
# Campos para Aspirante
aspirante = Column(Boolean, default=True) # Por padrão, todo novo militante é aspirante
data_inicio_aspirante = Column(DateTime, default=datetime.utcnow)
avaliacao_aspirante = Column(Text)
data_avaliacao_aspirante = Column(DateTime)
# Relacionamentos existentes
cotas_mensais = relationship("CotaMensal", back_populates="militante")
@@ -175,6 +192,10 @@ class Militante(Base):
MNS = 8
MPS = 16
JUVENTUDE = 32
QUADRO_ORIENTADOR = 64
ASPIRANTE = 128
RESPONSAVEL_FINANCAS = 256
RESPONSAVEL_IMPRENSA = 512
@staticmethod
def get_responsabilidades_list():
@@ -184,7 +205,11 @@ class Militante(Base):
(Militante.IMPRENSA, "Imprensa"),
(Militante.MNS, "MNS"),
(Militante.MPS, "MPS"),
(Militante.JUVENTUDE, "Juventude")
(Militante.JUVENTUDE, "Juventude"),
(Militante.QUADRO_ORIENTADOR, "Quadro-Orientador"),
(Militante.ASPIRANTE, "Aspirante"),
(Militante.RESPONSAVEL_FINANCAS, "Responsável de Finanças"),
(Militante.RESPONSAVEL_IMPRENSA, "Responsável de Imprensa")
]
def set_responsabilidades(self, resp_list):
@@ -243,6 +268,26 @@ class Militante(Base):
mail.send(msg)
def generate_username(self):
"""Gera um nome de usuário único baseado no primeiro nome e um código"""
from sqlalchemy import func
db = get_db_connection()
try:
# Pega o primeiro nome
primeiro_nome = self.nome.split()[0].lower()
# Conta quantos usuários já existem com esse prefixo
count = db.query(func.count(Usuario.id)).filter(
Usuario.username.like(f"{primeiro_nome}%")
).scalar()
# Gera o código (número sequencial)
codigo = str(count + 1).zfill(3)
return f"{primeiro_nome}{codigo}"
finally:
db.close()
class CotaMensal(Base):
__tablename__ = 'cotas_mensais'
@@ -375,10 +420,16 @@ class RelatorioVendasMateriais(Base):
setor = relationship("Setor", back_populates="relatorios_vendas")
comite = relationship("ComiteCentral", back_populates="relatorios_vendas")
class Usuario(Base):
class TipoUsuario(enum.Enum):
ADMIN = "admin"
CR_RESPONSAVEL = "cr_responsavel"
SETOR_RESPONSAVEL = "setor_responsavel"
USUARIO = "usuario"
class Usuario(Base, UserMixin):
__tablename__ = 'usuarios'
id = Column(Integer, primary_key=True, autoincrement=True)
id = Column(Integer, primary_key=True)
username = Column(String(50), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False)
email = Column(String(100), unique=True, nullable=False)
@@ -392,74 +443,111 @@ class Usuario(Base):
motivo_logout = Column(String(100))
cr_id = Column(Integer, ForeignKey('comites_regionais.id'))
celula_id = Column(Integer, ForeignKey('celulas.id'))
session_timeout = Column(Integer, default=30)
tipo = Column(String(17), nullable=False)
ultima_atividade = Column(DateTime, default=datetime.utcnow)
# Relacionamento com militante
militante_id = Column(Integer, ForeignKey('militantes.id'))
militante = relationship("Militante", backref=backref("usuario", uselist=False))
role = relationship("Role", back_populates="usuarios")
setor = relationship("Setor", back_populates="usuarios")
celula = relationship("Celula")
cr = relationship("ComiteRegional")
# Relacionamentos
roles = relationship("Role", secondary="user_roles", back_populates="users")
setor = relationship('Setor', back_populates='usuarios')
cr = relationship('ComiteRegional', back_populates='usuarios')
celula = relationship('Celula', back_populates='usuarios')
def __init__(self, username, password, is_admin=False):
def get_id(self):
return str(self.id)
@property
def is_authenticated(self):
return True
@property
def is_active(self):
return self.ativo
@property
def is_anonymous(self):
return False
def __init__(self, username, password, is_admin=False, email=None, tipo="USUARIO"):
self.username = username
self.password_hash = generate_password_hash(password)
self.is_admin = is_admin
self.otp_secret = pyotp.random_base32() # Gerar segredo OTP na criação
self.email = email
self.ativo = True
self.session_timeout = 30
self.tipo = tipo
self.ultima_atividade = datetime.utcnow()
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def verify_otp(self, otp_code):
"""Verifica se o código OTP fornecido é válido"""
if not self.otp_secret:
print(f"Erro: Usuário {self.username} não tem segredo OTP configurado")
return False
totp = pyotp.TOTP(self.otp_secret)
is_valid = totp.verify(otp_code)
print(f"Verificando OTP para {self.username}")
print(f"Segredo: {self.otp_secret}")
print(f"Código fornecido: {otp_code}")
print(f"Resultado: {'válido' if is_valid else 'inválido'}")
return is_valid
def update_last_activity(self):
self.ultima_atividade = datetime.utcnow()
def is_session_expired(self):
if not self.ultima_atividade:
return True
time_diff = datetime.utcnow() - self.ultima_atividade
return time_diff.total_seconds() > (self.session_timeout * 60)
def check_session_timeout(self):
"""Verifica se a sessão do usuário expirou"""
if not self.ultima_atividade:
return True
time_diff = datetime.utcnow() - self.ultima_atividade
return time_diff.total_seconds() > (self.session_timeout * 60)
def has_permission(self, permission_name):
"""Verifica se o usuário tem uma determinada permissão"""
for role in self.roles:
for permission in role.permissions:
if permission.nome == permission_name:
return True
return False
def has_role(self, role_nivel):
"""Verifica se o usuário tem um determinado nível de role"""
for role in self.roles:
if role.nivel == role_nivel:
return True
return False
def get_otp_uri(self):
"""Gera a URI para o QR code do OTP"""
"""Gera a URI para autenticação em duas etapas"""
if not self.otp_secret:
self.otp_secret = pyotp.random_base32()
totp = pyotp.TOTP(self.otp_secret)
return totp.provisioning_uri(
name=self.username,
return pyotp.totp.TOTP(self.otp_secret).provisioning_uri(
self.username,
issuer_name="Sistema de Controles"
)
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True, autoincrement=True)
nome = Column(String(50), unique=True, nullable=False)
nivel = Column(Integer, nullable=False) # Nível hierárquico (1: admin, 2: coordenador, 3: militante)
usuarios = relationship("Usuario", back_populates="role")
permissoes = relationship("RolePermissao", back_populates="role")
def verify_otp(self, code):
"""Verifica se um código OTP é válido"""
if not self.otp_secret:
print(f"Erro: OTP secret não configurado para o usuário {self.username}")
return False
print(f"Verificando OTP para usuário {self.username}")
print(f"OTP Secret: {self.otp_secret}")
print(f"Código fornecido: {code}")
totp = pyotp.totp.TOTP(self.otp_secret)
is_valid = totp.verify(code)
print(f"Resultado da verificação: {'Válido' if is_valid else 'Inválido'}")
print(f"Tempo atual: {datetime.utcnow()}")
print(f"Período atual: {totp.timecode(datetime.utcnow())}")
return is_valid
class Permissao(Base):
__tablename__ = 'permissoes'
id = Column(Integer, primary_key=True, autoincrement=True)
nome = Column(String(50), unique=True, nullable=False)
descricao = Column(String(255))
roles = relationship("RolePermissao", back_populates="permissao")
class RolePermissao(Base):
__tablename__ = 'roles_permissoes'
role_id = Column(Integer, ForeignKey('roles.id'), primary_key=True)
permissao_id = Column(Integer, ForeignKey('permissoes.id'), primary_key=True)
role = relationship("Role", back_populates="permissoes")
permissao = relationship("Permissao", back_populates="roles")
def logout(self):
"""Registra o logout do usuário"""
self.ultimo_logout = datetime.utcnow()
self.motivo_logout = "Logout manual"
self.ultima_atividade = None
class PagamentoCelula(Base):
__tablename__ = 'pagamentos_celula'
@@ -537,12 +625,9 @@ class TransacaoPIX(Base):
if os.path.exists(db_path):
os.remove(db_path)
def init_database():
"""Inicializa o banco de dados com dados básicos"""
print("Inicializando banco de dados...")
# Criar todas as tabelas
Base.metadata.create_all(engine)
def init_rbac():
"""Inicializa o sistema RBAC"""
print("Inicializando sistema RBAC...")
session = SessionLocal()
try:
@@ -554,7 +639,7 @@ def init_database():
# Criar role de admin
admin_role = session.query(Role).filter_by(nome="Administrador").first()
if not admin_role:
admin_role = Role(nome="Administrador", nivel=1)
admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL)
session.add(admin_role)
session.commit()
@@ -568,6 +653,11 @@ def init_database():
admin.email = "admin@example.com"
admin.role_id = admin_role.id
# Adicionar apenas a permissão de system_config ao admin
permission = session.query(Permission).filter_by(nome='system_config').first()
if permission and permission not in admin_role.permissions:
admin_role.permissions.append(permission)
session.add(admin)
session.commit()
@@ -578,7 +668,100 @@ def init_database():
print(f"OTP Secret: {admin.otp_secret}")
else:
print("Usuário admin já existe")
# Garantir que o admin tenha apenas a permissão de system_config
admin_role = session.query(Role).filter_by(nome="Administrador").first()
if admin_role:
# Remover todas as permissões atuais
admin_role.permissions = []
# Adicionar apenas a permissão de system_config
permission = session.query(Permission).filter_by(nome='system_config').first()
if permission:
admin_role.permissions.append(permission)
session.commit()
except Exception as e:
print(f"Erro na inicialização do sistema RBAC: {e}")
session.rollback()
raise
finally:
session.close()
def init_database():
"""Inicializa o banco de dados com dados básicos"""
print("Inicializando banco de dados...")
# Criar todas as tabelas
Base.metadata.drop_all(engine) # Remover todas as tabelas existentes
Base.metadata.create_all(engine)
session = SessionLocal()
try:
# Criar role de administrador
admin_role = Role(nome="Administrador", nivel=Role.SECRETARIO_GERAL)
session.add(admin_role)
session.commit()
# Verificar se existe um QR code salvo
qr_path = Path('admin_qr.png')
admin_otp_secret = None
if qr_path.exists():
# Extrair o segredo OTP do nome do arquivo temporário dentro do QR
try:
import re
with open('admin_qr.txt', 'r') as f:
qr_content = f.read()
# O segredo OTP está no formato otpauth://totp/admin?secret=XXXXX&issuer=Sistema%20de%20Controles
match = re.search(r'secret=([A-Z0-9]+)&', qr_content)
if match:
admin_otp_secret = match.group(1)
print(f"Usando OTP existente: {admin_otp_secret}")
except Exception as e:
print(f"Erro ao ler OTP existente: {e}")
if not admin_otp_secret:
admin_otp_secret = pyotp.random_base32()
print(f"Novo OTP gerado: {admin_otp_secret}")
# Criar usuário admin
admin = Usuario(
username="admin",
password="admin123",
is_admin=True,
email="admin@example.com",
tipo="ADMIN"
)
admin.role_id = admin_role.id
admin.otp_secret = admin_otp_secret
session.add(admin)
session.commit()
# Gerar novo QR code se não existir
if not qr_path.exists():
totp = pyotp.totp.TOTP(admin_otp_secret)
provisioning_uri = totp.provisioning_uri("admin", issuer_name="Sistema de Controles")
# Salvar a URI em um arquivo texto para referência futura
with open('admin_qr.txt', 'w') as f:
f.write(provisioning_uri)
# Gerar QR code
import qrcode
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img.save('admin_qr.png')
print("=== Usuário Admin Criado ===")
print(f"Username: admin")
print(f"Senha: admin123")
print(f"Email: {admin.email}")
print(f"OTP Secret: {admin.otp_secret}")
print(f"QR Code: {qr_path}")
except Exception as e:
print(f"Erro na inicialização do banco: {e}")
session.rollback()
@@ -586,6 +769,9 @@ def init_database():
finally:
session.close()
# Inicializar o sistema RBAC
init_rbac()
# Inicializar o banco de dados automaticamente quando o módulo for importado
init_database()

197
functions/decorators.py Normal file
View File

@@ -0,0 +1,197 @@
from functools import wraps
from flask import session, redirect, url_for, flash
from flask_login import current_user, login_required
from sqlalchemy.orm import joinedload
from .database import get_db_connection, Usuario
from .rbac import Permission
def require_login(f):
"""Decorador para verificar se o usuário está logado"""
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('login'))
db = get_db_connection()
try:
# Carregar o usuário com suas roles
user = db.query(Usuario).options(
joinedload(Usuario.roles)
).get(current_user.id)
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('login'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
def require_permission(permission_name):
"""Decorador para verificar se o usuário tem uma permissão específica"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('login'))
db = get_db_connection()
try:
user = db.query(Usuario).get(current_user.id)
if not user or not user.has_permission(permission_name):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator
def require_role(role_name):
"""Decorador para verificar se o usuário tem um papel específico"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('login'))
db = get_db_connection()
try:
user = db.query(Usuario).get(current_user.id)
if not user or not user.has_role(role_name):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator
def require_minimum_role(min_level):
"""Decorador para verificar se o usuário tem um papel com nível mínimo"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not current_user.is_authenticated:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('login'))
db = get_db_connection()
try:
user = db.query(Usuario).get(current_user.id)
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('login'))
highest_role = user.get_highest_role()
if not highest_role or highest_role.nivel < min_level:
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator
def require_instance_permission(permission_name):
"""Decorator para verificar se o usuário tem permissão em uma instância específica"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('login'))
db = get_db_connection()
try:
user = db.query(Usuario).get(session['user_id'])
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('login'))
# Verificar se o usuário tem a permissão em alguma instância
if not (user.has_permission(permission_name) or
user.has_permission(f"{permission_name}_sector") or
user.has_permission(f"{permission_name}_cr") or
user.has_permission(f"{permission_name}_cc")):
flash('Você não tem permissão para acessar esta página.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator
def require_instance_access(instance_type, instance_id):
"""Decorator para verificar se o usuário tem acesso a uma instância específica"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Você precisa estar logado para acessar esta página.', 'error')
return redirect(url_for('login'))
db = get_db_connection()
try:
user = db.query(Usuario).get(session['user_id'])
if not user:
flash('Usuário não encontrado.', 'error')
return redirect(url_for('login'))
# Verificar acesso baseado na instância do usuário
if instance_type == 'celula':
if not (user.celula_id == instance_id or
user.has_permission(Permission.VIEW_SECTOR_REPORTS) or
user.has_permission(Permission.VIEW_CR_REPORTS) or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a esta célula.', 'error')
return redirect(url_for('index'))
elif instance_type == 'setor':
if not (user.setor_id == instance_id or
user.has_permission(Permission.VIEW_CR_REPORTS) or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este setor.', 'error')
return redirect(url_for('index'))
elif instance_type == 'cr':
if not (user.cr_id == instance_id or
user.has_permission(Permission.VIEW_CC_REPORTS)):
flash('Você não tem acesso a este CR.', 'error')
return redirect(url_for('index'))
# Atualiza timestamp da última atividade
user.update_last_activity()
db.commit()
return f(*args, **kwargs)
finally:
db.close()
return decorated_function
return decorator

222
functions/permissions.py Normal file
View File

@@ -0,0 +1,222 @@
from functools import wraps
from flask import abort, g
from .database import Militante, Celula, Setor, CR, CC
def check_permission(permission_func):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not permission_func(*args, **kwargs):
abort(403)
return f(*args, **kwargs)
return decorated_function
return decorator
def can_manage_militante(militante_id):
"""Verifica se o usuário atual pode gerenciar um militante específico."""
if not g.user or not g.user.militante:
return False
militante = Militante.query.get(militante_id)
if not militante:
return False
# Secretário Geral e Secretário de Organização podem gerenciar qualquer militante
if g.user.militante.responsabilidades & (Militante.SECRETARIO_GERAL | Militante.SECRETARIO_ORGANIZACAO):
return True
# Secretário de CC pode gerenciar militantes do seu CC
if g.user.militante.responsabilidades & Militante.SECRETARIO_CC:
if militante.celula.setor.cr.cc_id == g.user.militante.celula.setor.cr.cc_id:
return True
# Secretário de CR pode gerenciar militantes do seu CR
if g.user.militante.responsabilidades & Militante.SECRETARIO_CR:
if militante.celula.setor.cr_id == g.user.militante.celula.setor.cr_id:
return True
# Secretário de Setor pode gerenciar militantes do seu setor
if g.user.militante.responsabilidades & Militante.SECRETARIO_SETOR:
if militante.celula.setor_id == g.user.militante.celula.setor_id:
return True
# Secretário de Célula pode gerenciar militantes da sua célula
if g.user.militante.responsabilidades & Militante.SECRETARIO_CELULA:
if militante.celula_id == g.user.militante.celula_id:
return True
return False
def can_manage_celula(celula_id):
"""Verifica se o usuário atual pode gerenciar uma célula específica."""
if not g.user or not g.user.militante:
return False
celula = Celula.query.get(celula_id)
if not celula:
return False
# Secretário Geral e Secretário de Organização podem gerenciar qualquer célula
if g.user.militante.responsabilidades & (Militante.SECRETARIO_GERAL | Militante.SECRETARIO_ORGANIZACAO):
return True
# Secretário de CC pode gerenciar células do seu CC
if g.user.militante.responsabilidades & Militante.SECRETARIO_CC:
if celula.setor.cr.cc_id == g.user.militante.celula.setor.cr.cc_id:
return True
# Secretário de CR pode gerenciar células do seu CR
if g.user.militante.responsabilidades & Militante.SECRETARIO_CR:
if celula.setor.cr_id == g.user.militante.celula.setor.cr_id:
return True
# Secretário de Setor pode gerenciar células do seu setor
if g.user.militante.responsabilidades & Militante.SECRETARIO_SETOR:
if celula.setor_id == g.user.militante.celula.setor_id:
return True
return False
def can_manage_setor(setor_id):
"""Verifica se o usuário atual pode gerenciar um setor específico."""
if not g.user or not g.user.militante:
return False
setor = Setor.query.get(setor_id)
if not setor:
return False
# Secretário Geral e Secretário de Organização podem gerenciar qualquer setor
if g.user.militante.responsabilidades & (Militante.SECRETARIO_GERAL | Militante.SECRETARIO_ORGANIZACAO):
return True
# Secretário de CC pode gerenciar setores do seu CC
if g.user.militante.responsabilidades & Militante.SECRETARIO_CC:
if setor.cr.cc_id == g.user.militante.celula.setor.cr.cc_id:
return True
# Secretário de CR pode gerenciar setores do seu CR
if g.user.militante.responsabilidades & Militante.SECRETARIO_CR:
if setor.cr_id == g.user.militante.celula.setor.cr_id:
return True
return False
def can_manage_cr(cr_id):
"""Verifica se o usuário atual pode gerenciar um CR específico."""
if not g.user or not g.user.militante:
return False
cr = CR.query.get(cr_id)
if not cr:
return False
# Secretário Geral e Secretário de Organização podem gerenciar qualquer CR
if g.user.militante.responsabilidades & (Militante.SECRETARIO_GERAL | Militante.SECRETARIO_ORGANIZACAO):
return True
# Secretário de CC pode gerenciar CRs do seu CC
if g.user.militante.responsabilidades & Militante.SECRETARIO_CC:
if cr.cc_id == g.user.militante.celula.setor.cr.cc_id:
return True
return False
def can_manage_cc(cc_id):
"""Verifica se o usuário atual pode gerenciar um CC específico."""
if not g.user or not g.user.militante:
return False
# Apenas Secretário Geral e Secretário de Organização podem gerenciar CCs
if g.user.militante.responsabilidades & (Militante.SECRETARIO_GERAL | Militante.SECRETARIO_ORGANIZACAO):
return True
return False
def can_manage_financas(instancia_id, tipo_instancia):
"""Verifica se o usuário atual pode gerenciar finanças de uma instância específica."""
if not g.user or not g.user.militante:
return False
# Secretário Geral e Secretário de Organização podem gerenciar finanças de qualquer instância
if g.user.militante.responsabilidades & (Militante.SECRETARIO_GERAL | Militante.SECRETARIO_ORGANIZACAO):
return True
# Responsável de Finanças da instância pode gerenciar suas finanças
if tipo_instancia == 'celula':
celula = Celula.query.get(instancia_id)
if celula and celula.responsavel_financas_id == g.user.militante.id:
return True
elif tipo_instancia == 'setor':
setor = Setor.query.get(instancia_id)
if setor and setor.responsavel_financas_id == g.user.militante.id:
return True
elif tipo_instancia == 'cr':
cr = CR.query.get(instancia_id)
if cr and cr.responsavel_financas_id == g.user.militante.id:
return True
elif tipo_instancia == 'cc':
cc = CC.query.get(instancia_id)
if cc and cc.responsavel_financas_id == g.user.militante.id:
return True
return False
def can_manage_imprensa(instancia_id, tipo_instancia):
"""Verifica se o usuário atual pode gerenciar imprensa de uma instância específica."""
if not g.user or not g.user.militante:
return False
# Secretário Geral e Secretário de Organização podem gerenciar imprensa de qualquer instância
if g.user.militante.responsabilidades & (Militante.SECRETARIO_GERAL | Militante.SECRETARIO_ORGANIZACAO):
return True
# Responsável de Imprensa da instância pode gerenciar sua imprensa
if tipo_instancia == 'celula':
celula = Celula.query.get(instancia_id)
if celula and celula.responsavel_imprensa_id == g.user.militante.id:
return True
elif tipo_instancia == 'setor':
setor = Setor.query.get(instancia_id)
if setor and setor.responsavel_imprensa_id == g.user.militante.id:
return True
elif tipo_instancia == 'cr':
cr = CR.query.get(instancia_id)
if cr and cr.responsavel_imprensa_id == g.user.militante.id:
return True
elif tipo_instancia == 'cc':
cc = CC.query.get(instancia_id)
if cc and cc.responsavel_imprensa_id == g.user.militante.id:
return True
return False
def can_manage_responsabilidades(militante_id):
"""Verifica se o usuário atual pode gerenciar responsabilidades de um militante específico."""
if not g.user or not g.user.militante:
return False
militante = Militante.query.get(militante_id)
if not militante:
return False
# Secretário Geral e Secretário de Organização podem gerenciar responsabilidades de qualquer militante
if g.user.militante.responsabilidades & (Militante.SECRETARIO_GERAL | Militante.SECRETARIO_ORGANIZACAO):
return True
# Secretário de CC pode gerenciar responsabilidades de militantes do seu CC
if g.user.militante.responsabilidades & Militante.SECRETARIO_CC:
if militante.celula.setor.cr.cc_id == g.user.militante.celula.setor.cr.cc_id:
return True
# Secretário de CR pode gerenciar responsabilidades de militantes do seu CR
if g.user.militante.responsabilidades & Militante.SECRETARIO_CR:
if militante.celula.setor.cr_id == g.user.militante.celula.setor.cr_id:
return True
# Secretário de Setor pode gerenciar responsabilidades de militantes do seu setor
if g.user.militante.responsabilidades & Militante.SECRETARIO_SETOR:
if militante.celula.setor_id == g.user.militante.celula.setor_id:
return True
return False

292
functions/rbac.py Normal file
View File

@@ -0,0 +1,292 @@
from sqlalchemy import Column, Integer, String, Text, ForeignKey, Table
from sqlalchemy.orm import relationship
from .base import Base
# Tabela de mapeamento Role-Permission
role_permissions = Table(
'role_permissions',
Base.metadata,
Column('role_id', Integer, ForeignKey('roles.id'), primary_key=True),
Column('permission_id', Integer, ForeignKey('permissions.id'), primary_key=True)
)
# Tabela de mapeamento User-Role
user_roles = Table(
'user_roles',
Base.metadata,
Column('user_id', Integer, ForeignKey('usuarios.id'), primary_key=True),
Column('role_id', Integer, ForeignKey('roles.id'), primary_key=True)
)
class Role(Base):
__tablename__ = 'roles'
id = Column(Integer, primary_key=True, autoincrement=True)
nome = Column(String(50), unique=True, nullable=False)
nivel = Column(Integer, nullable=False) # Nível hierárquico
descricao = Column(Text)
# Relacionamentos
permissions = relationship("Permission", secondary=role_permissions, back_populates="roles")
users = relationship("Usuario", secondary=user_roles, back_populates="roles")
# Níveis de role
MILITANTE_BASICO = 1
SECRETARIO_CELULA = 2
MEMBRO_SETOR = 3
SECRETARIO_SETOR = 4
MEMBRO_CR = 5
SECRETARIO_CR = 6
MEMBRO_CC = 7
SECRETARIO_GERAL = 8
@staticmethod
def get_roles_list():
return [
(Role.MILITANTE_BASICO, "Militante Básico"),
(Role.SECRETARIO_CELULA, "Secretário de Célula"),
(Role.MEMBRO_SETOR, "Membro de Setor"),
(Role.SECRETARIO_SETOR, "Secretário de Setor"),
(Role.MEMBRO_CR, "Membro de CR"),
(Role.SECRETARIO_CR, "Secretário de CR"),
(Role.MEMBRO_CC, "Membro do CC"),
(Role.SECRETARIO_GERAL, "Secretário Geral")
]
class Permission(Base):
__tablename__ = 'permissions'
id = Column(Integer, primary_key=True, autoincrement=True)
nome = Column(String(50), unique=True, nullable=False)
descricao = Column(Text)
# Relacionamentos
roles = relationship("Role", secondary=role_permissions, back_populates="permissions")
# Permissões básicas
VIEW_OWN_DATA = "view_own_data"
EDIT_OWN_DATA = "edit_own_data"
VIEW_CELL_DATA = "view_cell_data"
CREATE_MILITANT = "create_militant" # Nova permissão para criar militantes
# Permissões de célula
MANAGE_CELL_MEMBERS = "manage_cell_members"
CREATE_CELL_MEMBER = "create_cell_member"
VIEW_CELL_REPORTS = "view_cell_reports"
REGISTER_CELL_PAYMENT = "register_cell_payment"
# Permissões de setor
MANAGE_SECTOR_CELLS = "manage_sector_cells"
CREATE_SECTOR_CELL = "create_sector_cell"
VIEW_SECTOR_REPORTS = "view_sector_reports"
REGISTER_SECTOR_PAYMENT = "register_sector_payment"
# Permissões de CR
MANAGE_CR_SECTORS = "manage_cr_sectors"
CREATE_CR_SECTOR = "create_cr_sector"
VIEW_CR_REPORTS = "view_cr_reports"
REGISTER_CR_PAYMENT = "register_cr_payment"
# Permissões de CC
MANAGE_CC_CRS = "manage_cc_crs"
CREATE_CC_CR = "create_cc_cr"
VIEW_CC_REPORTS = "view_cc_reports"
REGISTER_CC_PAYMENT = "register_cc_payment"
SYSTEM_CONFIG = "system_config"
@staticmethod
def get_permissions_list():
return [
# Permissões básicas
(Permission.VIEW_OWN_DATA, "Visualizar próprios dados"),
(Permission.EDIT_OWN_DATA, "Editar próprios dados"),
(Permission.VIEW_CELL_DATA, "Visualizar dados da célula"),
(Permission.CREATE_MILITANT, "Criar novos militantes"), # Nova permissão
# Permissões de célula
(Permission.MANAGE_CELL_MEMBERS, "Gerenciar membros da célula"),
(Permission.CREATE_CELL_MEMBER, "Criar membros na célula"),
(Permission.VIEW_CELL_REPORTS, "Visualizar relatórios da célula"),
(Permission.REGISTER_CELL_PAYMENT, "Registrar pagamentos da célula"),
# Permissões de setor
(Permission.MANAGE_SECTOR_CELLS, "Gerenciar células do setor"),
(Permission.CREATE_SECTOR_CELL, "Criar células no setor"),
(Permission.VIEW_SECTOR_REPORTS, "Visualizar relatórios do setor"),
(Permission.REGISTER_SECTOR_PAYMENT, "Registrar pagamentos do setor"),
# Permissões de CR
(Permission.MANAGE_CR_SECTORS, "Gerenciar setores do CR"),
(Permission.CREATE_CR_SECTOR, "Criar setores no CR"),
(Permission.VIEW_CR_REPORTS, "Visualizar relatórios do CR"),
(Permission.REGISTER_CR_PAYMENT, "Registrar pagamentos do CR"),
# Permissões de CC
(Permission.MANAGE_CC_CRS, "Gerenciar CRs"),
(Permission.CREATE_CC_CR, "Criar CRs"),
(Permission.VIEW_CC_REPORTS, "Visualizar relatórios nacionais"),
(Permission.REGISTER_CC_PAYMENT, "Registrar pagamentos nacionais"),
(Permission.SYSTEM_CONFIG, "Configurar sistema")
]
def init_rbac():
"""Inicializa o sistema RBAC com roles e permissões básicas"""
from .database import get_db_connection
session = get_db_connection()
try:
# Criar roles se não existirem
for nivel, nome in Role.get_roles_list():
role = session.query(Role).filter_by(nivel=nivel).first()
if not role:
role = Role(nome=nome, nivel=nivel)
session.add(role)
# Criar permissões se não existirem
for nome, descricao in Permission.get_permissions_list():
permission = session.query(Permission).filter_by(nome=nome).first()
if not permission:
permission = Permission(nome=nome, descricao=descricao)
session.add(permission)
session.commit()
# Mapear permissões para roles
for role in session.query(Role).all():
# Militante Básico
if role.nivel == Role.MILITANTE_BASICO:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first()
]
# Secretário de Célula
elif role.nivel == Role.SECRETARIO_CELULA:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CELL_PAYMENT).first()
]
# Membro de Setor
elif role.nivel == Role.MEMBRO_SETOR:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first()
]
# Secretário de Setor
elif role.nivel == Role.SECRETARIO_SETOR:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_SECTOR_PAYMENT).first()
]
# Membro de CR
elif role.nivel == Role.MEMBRO_CR:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first()
]
# Secretário de CR
elif role.nivel == Role.SECRETARIO_CR:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CR_PAYMENT).first()
]
# Membro do CC
elif role.nivel == Role.MEMBRO_CC:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first()
]
# Secretário Geral
elif role.nivel == Role.SECRETARIO_GERAL:
role.permissions = [
session.query(Permission).filter_by(nome=Permission.VIEW_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.EDIT_OWN_DATA).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_DATA).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CELL_MEMBERS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CELL_MEMBER).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CELL_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_SECTOR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_SECTOR_CELLS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_SECTOR_CELL).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CR_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CR_SECTORS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CR_SECTOR).first(),
session.query(Permission).filter_by(nome=Permission.VIEW_CC_REPORTS).first(),
session.query(Permission).filter_by(nome=Permission.MANAGE_CC_CRS).first(),
session.query(Permission).filter_by(nome=Permission.CREATE_CC_CR).first(),
session.query(Permission).filter_by(nome=Permission.REGISTER_CC_PAYMENT).first(),
session.query(Permission).filter_by(nome=Permission.SYSTEM_CONFIG).first()
]
# Administrador
elif role.nome == "Administrador":
role.permissions = [
session.query(Permission).filter_by(nome=Permission.SYSTEM_CONFIG).first()
]
session.commit()
except Exception as e:
print(f"Erro ao inicializar RBAC: {e}")
session.rollback()
raise
finally:
session.close()