Code source de AuthService

# -*- coding: utf-8 -*-

"""Définition des méthodes d'authentification et de gestion de droits."""

# useful
import os
import sys
import time
import re
import random
# communication
import socket
import ssl
import threading
# pour encoder/décoder les photos
import base64
# pour hasher les mdp
import hashlib
# les objets échangés sont des objets json
import json

# module qui a des accès à la base de données préprogrammés (car souvent utilisés)
import ReadDatabase
# Définition de la classe Compte
from BaseFonctions import Compte
# module qui définit des fonctions essentielles
import BaseFonctions
# Import du fichier de conf
sys.path.append("../config/")
import config
# module qui définit les erreurs
import ExceptionsNote


[docs]class AuthService: """Classe donnant accès aux services d'authentification."""
[docs] def debug(self, debuglevel, text=""): """Affiche des informations de debug.""" BaseFonctions.debug(debuglevel, text, u"AuthService")
[docs] def update_droits_connus(self, userid, new_droits): """Update la liste ``self.droits_connus_bdd``""" # évidemment s'il était pas connu ça le rend connu self.droits_connus_bdd[userid] = new_droits
[docs] def del_droits_connus(self, userid): """Supprime les droits d'un user (attention, ces droits ne sont pas forcéments vraiment là).""" if self.droits_connus_bdd.has_key(userid): del self.droits_connus_bdd[userid]
[docs] def login(self, user, password, auth_type="bdd", byidbde=False): """Authentification. Le password est envoyé en clair. (on est en ssl) Renvoie également l'utilisateur bdd, si il y a lieu. """ if (auth_type == "bdd"): try: if byidbde: futur_user = Compte(ReadDatabase.get_compte(user)) else: futur_user = Compte(ReadDatabase.get_compte_by_pseudo(user)) dbpass = futur_user.passwd salt, db_hashed_pass = dbpass.split('|') if (hashlib.sha256((salt + password).encode("utf-8")).hexdigest() == db_hashed_pass): # Si le login a réussi, on force le rafraîchissement des droits self.del_droits_connus(futur_user.idbde) return (True, futur_user) else: return (False, None) except ExceptionsNote.Error404: self.debug(3, u"Authentification failed : pseudo %s inconnu" % user) return (False, None) elif (auth_type == "special"): logins = self.get_logins() if logins.has_key(user): (dbpass, dbacl) = logins[user] salt, db_hashed_pass = dbpass.split('|') if (hashlib.sha256(salt + password).hexdigest() == db_hashed_pass): return (True, dbacl) return (False, None) else: self.debug(2, "auth_type unknown : %s" % (auth_type)) return (False, None)
[docs] def adduser(self, user, password, newacl): """Ajout d'un utilisateur spécial ou mise à jour de son mot de passe et/ou de ses autorisations.""" self.lock.acquire() logins = self.get_logins() if (password == "-"): if logins.has_key(user): (oldpass, oldacl) = logins[user] logins[user] = (oldpass, newacl) else: self.debug(3, u"Utilisateur " + user + u" inconnu, mise a jour impossible") else: if logins.has_key(user): self.debug(1, u"Mise à jour de l'utilisateur " + user) else: self.debug(1, u"Création de l'utilisateur " + user) salt, plop = BaseFonctions.random_chain(5) password = salt + '|' + hashlib.sha256(salt + password).hexdigest() logins[user] = (password, newacl) self.write_logins(logins) self.lock.release()
[docs] def deluser(self, user): """Suppression d'un utilisateur.""" self.lock.acquire() logins = self.get_logins() if logins.has_key(user): logins.pop(user) self.lock.release() ok = True else: ok = False self.write_logins(logins) self.lock.release() return ok
[docs] def get_logins(self): """Récupère la liste des identifiants des utilisateurs spéciaux dans le fichier JSON. En son absence, prend un dicionnaire vide. """ try: with open(config.authfile, 'r') as f: return json.load(f) except: # Le fichier de login n'existe pas return {}
[docs] def write_logins(self, logins): """Écrit le fichier JSON contenant les identifiants des utilisateurs spéciaux.""" with open(config.authfile, 'w') as f: json.dump(logins, open(config.authfile, 'w'))
[docs] def get_acl(self, userid): """Renvoie les acl d'un user bdd, si ils ne sont pas en mémoire, va les chercher dans la base.""" if self.droits_connus_bdd.has_key(userid): return self.droits_connus_bdd[userid] else: try: compte = Compte(ReadDatabase.get_compte(userid)) except ExceptionsNote.Error404 as exc: raise ExceptionsNote.Error404("Authentification failed : " + str(exc)) acl = [compte.get_droits(), compte.get_surdroits(), compte.supreme] self.update_droits_connus(userid, acl) # oui, un update peut créer return acl
[docs] def has_acl(self, userid, access, surdroit=False, masque=[[], [], False], sousdroit=False): """Répond si un utilisateur (bdd !) a le droit/surdroit demandé.""" # Avant toutes choses, certains droits sont des privilèges des # utilisateurs spéciaux. # Même si un user bdd l'a dans sa liste de droits, il ne l'a pas vraiment if access in config.special_privileges: return False # On va chercher dans la config les alias de droits aliases = config.droits_aliases_bdd # On va chercher les acl de cet user acl = self.get_acl(userid) if surdroit: # On cherche non pas à savoir si l'user a le droit access mais le *surdroit* access. surdroits = acl[1] return BaseFonctions.inOrInAliasesWithMask(access, surdroits, aliases, masque[1],sousdroit=sousdroit) else: if (access == "supreme"): # c'est un droit particulier return (acl[2] and not masque[2]) #masque[2] et le booléen correspondant à supreme else: droits = acl[0] return BaseFonctions.inOrInAliasesWithMask(access, droits, aliases, masque[0],sousdroit=sousdroit)
def __init__(self, masterserver): """Initialisation de la classe.""" self.lock = threading.Lock() # Une référence bien utile vers le MainServer self.masterserver = masterserver # On ne va chercher les droits dans la base que si on ne les connait pas self.droits_connus_bdd = {} # On maintient une liste des timestamp de dernière activité des gens self.last_action_timestamps = {}