Code source de BaseFonctions

# -*- coding: utf-8 -*-

"""
Ce module définit des fonctions de base utiles un peu partout
"""

# On importe plein de modules (c'est pas forcément nécessaire, mais ça peut pas faire de mal...)
# useful
import os
import sys
import time
import re
import random
# communication
import socket
import ssl
import netaddr
import threading
# pour hasher les mdp
import hashlib
# les objets échangés sont des objets json
import json
# module d'accès à la bdd
import psycopg2
import psycopg2.extras
# Pour avoir les résultats en unicode
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
# nécessaires pour le typage
import datetime

# Import du fichier de conf
sys.path.append("../config/")
import config
# module qui définit les erreurs
import ExceptionsNote


#: Lock unique utilisé afin que deux threads ne mélangent pas leur messages de debug
debug_lock = threading.Lock()

[docs]def getcursor(): """Ouvre une connexion à la base de données et renvoie la pare ``(connexion, curseur)``. En effet, il faut garder la variable ``connexion``, sinon, elle est garbage-collectée, ce qui ferme la connexion à la base. """ con = psycopg2.connect(database=config.database, user=config.pgsql_user) con.set_client_encoding("utf-8") return (con, con.cursor(cursor_factory=psycopg2.extras.DictCursor))
[docs]def isPgsqlDate(s): """Vérifie si l'objet passé est une date PgSQL ou si la chaîne de caractère peut être transformée en date par pgsql. """ if isinstance(s, (str, unicode)): con, cur = getcursor() try: cur.execute("SELECT (CAST (%s as timestamp));", (s,)) except: return False return True else: return type(s) in [datetime.datetime, datetime.date]
[docs]def executable_cmd(cmd): """Retourne True si ``cmd`` peut être exécutée par le client. Ne garantit pas que la commande existe vraiment. """ return not cmd.startswith("_")
[docs]def inOrInAliases(terme, liste, aliases, sousdroit=False): """Cherche si ``terme`` est dans ``liste`` ou bien s'il est dans une des valeur du dico ``aliases`` dont la clé la clé est dans ``liste``. Si sousdroit est mis à True, cherche également si liste contient un sous-droit de terme. *NB : Les alias* ``"all"`` *et* ``"root"`` *sont hardcodés.* """ if "all" in liste or "root" in liste: return True terme_in_liste = terme in liste and not terme.startswith("_") stronger_in_liste = any([(alias in liste) and (terme in aliases[alias]) for alias in aliases.keys() if not alias.startswith("_")]) weaker_in_liste = (terme in aliases) and any([(ssd in liste) for ssd in aliases[terme]]) return terme_in_liste or stronger_in_liste or (sousdroit and weaker_in_liste)
[docs]def inOrInAliasesWithMask(terme, liste, aliases, masque, sousdroit=False): """``inOrInAliases(terme, liste, aliases) and not inOrInAliases(terme, masque, aliases)``""" droit = inOrInAliases(terme, liste, aliases, sousdroit=sousdroit) # "overforced => forced" hardcodé droit = droit or (terme == "forced" and "overforced" in liste) masqued = inOrInAliases(terme, masque, aliases) # On a le droit si on a le droit (thanks captain) et qu'il n'est pas dans le masque return droit and not masqued
[docs]def debug(debuglevel, text, name=u"", linejumps=0): """Afficher des informations de debug, sur stdout et/ou dans le fichier de log. Certains messages sont droppés, ça dépend du ``debug_level``. Voir :py:data:`config.debug_level_logfile` et :py:data:`config.debug_level_stdout`. """ if isinstance(text, Exception): text = u"Exception : %s\n%s" % (type(text), text) if not isinstance(text, unicode): raise ExceptionsNote.BadDebugTextType debugout = u"\n" * linejumps + time.strftime(u"%Y/%m/%d %H:%M:%S ") debugout += (name or u"Debug") + u" :: " + text debug_lock.acquire() if config.debug_stdout and (debuglevel <= config.debug_level_stdout): print debugout.encode("utf-8") if config.debug_logfile and (debuglevel <= config.debug_level_logfile): debug_file = open(config.logfile, "a") try: debug_file.write(debugout + u"\n") except: pass debug_file.close() debug_lock.release()
[docs]def log(ip, utilisateur, fonction, cur, params="", relevant_ids=[]): """Inscrit une action effectuée par utilisateur dans la table de log.""" # on vérifie que l'IP est bien une IP try: netaddr.IPAddress(ip) except AddrFormatError: raise ExceptionsNote.TuTeFousDeMaGueule("logging : L'ip fournie n'est pas reconnue par netaddr.IPAddress") req = "INSERT INTO log (ip, utilisateur, fonction, params, ids) VALUES (%s, %s, %s, %s, %s);" if isinstance(params, str): params = params.decode("utf-8") elif not isinstance(params, unicode): params = unicode(params) cur.execute(req, (ip, utilisateur, fonction, params, relevant_ids))
[docs]def random_chain(size, saltsize=0): """Donne une chaîne de caractères aléatoire ainsi que son hash sha256. Le hash est salé avec un sel de longueur ``saltsize``. """ chars = "azertyuiopqsdfghjklmwxcvbn0123456789" chain = "" for i in range(size): chain += random.choice(chars) if saltsize != 0: salt = "" for i in range(saltsize): salt += random.choice(chars) return (chain, salt + "|" + hashlib.sha256(salt + chain).hexdigest()) else: return (chain, hashlib.sha256(chain).hexdigest())
[docs]def hash_pass(passwd): """Hashe le password et renvoie <sel>|<hash>.""" (salt, plop) = random_chain(8) hashedpass = salt + '|' + hashlib.sha256((salt + passwd).encode("utf-8")).hexdigest() return hashedpass
[docs]def hash_mail(mail, idbde): """ Génère un hash à partir de l'adresse ``mail`` d'un compte, de son ``idbde``, et d'un secret accessible uniquement sur le serveur. """ tohash = "%s|%s|%s" % (mail, idbde, config.secret_key) hashed = hashlib.sha256(tohash).hexdigest() return hashed
[docs]def pseudo_libre(pseudo, idbde=None): """Dit si un pseudo est utilisable ou non. Le test est insensible à la casse : si ``"PseuDo"`` est pris, on n'autorise pas l'utilisation de ``"pSeudO"``. * Un pseudo ne peut pas être vide. * Un pseudo ne peut pas commencer par ``"#"``. * Un pseudo déjà utilisé par quelqu'un n'est pas utilisable… * … ou si c'est un pseudo que quelqu'un possédait il y a peu (peu = ``configurations.historique_incompressible``) * … ou si c'est un alias que quelqu'un possède. *NB : si l'utilisateur qui a utilisé ce pseudo il n'y a pas longtemps est moi-même, je peux le reprendre quand même.* """ if (len(pseudo) == 0) or (pseudo[0] == "#"): return False pseudo = pseudo.lower() con, cur = getcursor() # pseudo utilisé cur.execute("SELECT idbde FROM comptes WHERE LOWER(pseudo) = %s;", (pseudo,)) # On a le droit de reprendre son propre pseudo # Ça a un sens parce que, par exemple, on peut avoir envie de changer sa casse idbdes = [l["idbde"] for l in cur.fetchall() if l["idbde"] != idbde] currently_used = (len(idbdes) > 0) # pseudos qui sont dans l'historique depuis moins de <historique_incompressible> heures cur.execute("""SELECT idbde FROM historique WHERE LOWER(avant) = %s AND date > (now() - (SELECT CAST((historique_incompressible || 'h') AS interval) FROM configurations WHERE used = true));""", (pseudo,)) # Sauf que si c'est justement l'utilisateur qui le demande, il a le droit de reprendre son ancien # pseudo sans attendre la fin du timeout incompressible liste = [l for l in cur.fetchall() if l["idbde"] != idbde] recently_used = (len(liste) > 0) # aliases cur.execute("SELECT idbde FROM aliases WHERE LOWER(alias) = %s;", (pseudo,)) alias_used = (len(cur.fetchall()) > 0) return not(currently_used or recently_used or alias_used)
[docs]def default_pseudo(nom, prenom, startwith=None): """Génère le pseudo par défaut à partir du nom et du prénom du compte. Un pseudo n'a pas le droit de commencer par un #, donc on droppe tout # dans ``prenom`` et ``nom``, par sécurité. """ if startwith: pseudotry = startwith else: prenom, nom = prenom.replace(u"#", u""), nom.replace(u"#", u"") # On initialise le pseudo à "prenom" + "1ère lettre du nom si il y en a au moins une" pseudotry = (prenom + nom[:1]).lower() # On teste une première fois if pseudo_libre(pseudotry): return pseudotry # Sinon, on rajoute successivement les lettres suivantes du nom for lettre in nom[1:]: pseudotry += lettre.lower() if pseudo_libre(pseudotry): return pseudotry # Si on arrive là, c'est que prenom + nom_entier est déjà pris aussi, on rajoute donc un nombre basetry = pseudotry i = 1 while True: # On finira bien par y arriver pseudotry = basetry + unicode(i) if pseudo_libre(pseudotry): return pseudotry i += 1
[docs]def pg_parse(obj): """Renvoie essentiellement l'objet lui-même mais en le modifiant s'il est pgsql-typé en quelquechose de JSON-izable.""" if obj == None: return obj if isinstance(obj, (int, long, str, unicode, float, bool)): # cas de base return obj if type(obj) == list: return [pg_parse(i) for i in obj] if isPgsqlDate(obj): return str(obj) if isinstance(obj, dict): return {pg_parse(key) : pg_parse(val) for (key, val) in obj.items()} # Si on en arrive là, on ne sait pas trop quoi faire. # On va essayer de supposer que l'objet fourni est un résultat PgSQL, donc pas tout à # fait une liste (non reconnu dans type(obj) == list) # on va en faire un dico pour le parser try: obj = dict(obj) return pg_parse(obj) except Exception as exc: raise raise ExceptionsNote.PgParseError("Cet objet n'est pas prévu dans pg_parse : %s" % (obj,))
[docs]def isAdherent(idbde, date="now()"): """Dit si l'adhérent n° ``idbde`` est à jour d'adhésion à la date fournie, ou, à défaut, maintenant. Pour le même prix, renvoie sa section. """ # En fait, finalement c'est codé à l'échelle SQL alors pourquoi s'embêter ? :) con, cur = getcursor() cur.execute("SELECT * FROM isAdherent(%s, CAST(%s AS date));", (idbde, date)) l = cur.fetchone() return (l["answer"], l["section"], l["section_year"])
[docs]def adhesion_current_year(): """Calcule l'année d'inscription par défaut en fonction de la configuration et de la date actuelle.""" y, m, d = time.localtime()[:3] con, cur = getcursor() cur.execute("SELECT start_next_year_month, start_next_year_day FROM configurations WHERE used = true;") snym, snyd = cur.fetchone() if (m, d) < (snym, snyd): y -= 1 return y
[docs]def expandAliases(liste): """Récupère une liste de droits, et cherche si dedans il y a des aliases, le cas échéant, les in-line (sans pour autant les enlever de la liste). """ temp_data = [config.droits_aliases_bdd.get(k, []) + [k] for k in liste] return list(set([]).union(*temp_data)) # attention à l'étoile.
[docs]def hasMoreRights(compte1, compte2): """Fonction pour contrôler en cas de suppression si le ``compte2`` possède certains droits que le ``compte1`` n'aurait pas, et retourne ``False`` le cas échéant. Les droits en question sont contenus dans la liste :py:data:`config.no_delete_droits`. """ # Comparaison des suprêmes more_supreme = compte1.supreme or not compte2.supreme # Comparaison des surdroits intersection = set(expandAliases(compte2.get_surdroits())).intersection(config.no_delete_droits) more_surdroits = intersection.issubset(expandAliases(compte1.get_surdroits())) # Comparaison des droits intersection = set(expandAliases(compte2.get_droits())).intersection(config.no_delete_droits) more_droits = intersection.issubset(expandAliases(compte1.get_droits())) return more_supreme and more_surdroits and more_droits
[docs]class Compte(object): """Permet de faire quelques opérations sur un compte comme l'afficher, récupérer ses droits, sa section…""" def __init__(self, dico, section_data=None): """Création de l'objet compte, à partir d'un dico, qui peut directement être un résultat psycopg2. On fournit aussi la section à part. """ self.__dict__.update(dico) if section_data is None: self.is_adherent, self.section, self.section_year = False, "Unknown", -1 else: self.is_adherent, self.section, self.section_year = section_data def __str__(self): """Pretty print d'un compte.""" affiche = """ Identifiant bde : %s Supprimé : %s Type de compte : %s Pseudo : %s Aliases : %s Anciens pseudos : %s (encore valides) Solde : %s Nom : %s Prenom : %s Section : %s Numéro de téléphone : %s Mail : %s Adresse : %s Fonction : %s Normalien : %s Droits : %s Surdroits : %s Supreme : %s Rapports : Période (minutes) : %s Dernier : %s Prochain : %s Compte bloqué : %s""" trad_bool = {True: "oui", False: "non"} normalien, supreme, bloque = map(lambda x: trad_bool[bool(x)], [self.normalien, self.supreme, self.bloque]) return affiche % (self.idbde, self.deleted, self.type, self.pseudo, ", ".join(self.get_aliases()), ", ".join(self.get_historiques()), self.solde, self.nom, self.prenom, self.section, self.tel, self.mail, self.adresse, self.fonction, normalien, self.droits, self.surdroits, supreme, self.report_period, self.previous_report_date, self.next_report_date, bloque)
[docs] def modify(self, dico): """Modifie l'objet courant (ne sauvegarde pas les modfis dans la BDD).""" self.__dict__.update(dico)
[docs] def save(self, cur=None): """Modifie un adhérent (ne fait aucune vérification). Le ``__dict__`` a intérêt à contenir tout ce qu'il faut. Si ``cur`` est fourni, le curseur n'est pas COMMITé. """ cur_given = True if cur == None: con, cur = getcursor() cur_given = False req = "UPDATE comptes SET tel = %(tel)s, next_report_date = %(next_report_date)s, surdroits = %(surdroits)s, supreme = %(supreme)s, bloque = %(bloque)s, fonction = %(fonction)s, normalien = %(normalien)s, droits = %(droits)s, mail = %(mail)s, type = %(type)s, nom = %(nom)s, report_period = %(report_period)s, passwd = %(passwd)s, pbsante = %(pbsante)s, adresse = %(adresse)s, previous_report_date = %(previous_report_date)s, prenom = %(prenom)s, pseudo = %(pseudo)s, commentaire = %(commentaire)s, deleted = %(deleted)s WHERE idbde = %(idbde)s;" cur.execute(req, self.__dict__) if not cur_given: cur.execute("COMMIT;")
[docs] def get_droits(self): """Renvoie la liste des droits du compte.""" return self.droits.split(',')
[docs] def get_surdroits(self): """Renvoie la liste des surdroits du compte.""" return self.surdroits.split(',')
[docs] def get_aliases(self): """Va chercher les aliases de l'adhérent. Les renvoie sous forme de liste de paires ``(<id>, <alias>)``. """ con, cur = getcursor() cur.execute("SELECT id, alias FROM aliases WHERE idbde = %s;", (self.idbde,)) l = cur.fetchall() return [[i["id"], i["alias"]] for i in l]
[docs] def get_historiques(self, old=False): """Va chercher les anciens pseudos de l'adhérent. Par défaut, seulement ceux qui le référencent encore, sauf si ``old = True``. Les renvoie sous forme de liste. """ if old: valideclause = "" else: valideclause = " AND valide" con, cur = getcursor() cur.execute("SELECT avant FROM historique WHERE idbde = %s" + valideclause + ";", (self.idbde,)) l = cur.fetchall() return [i[0] for i in l]
[docs] def get_annees(self): """Renvoie les années où le compte a adhéré au BDE.""" con, cur = getcursor() cur.execute("SELECT annee FROM adhesions WHERE idbde=%s ORDER BY annee;", (self.idbde,)) l = cur.fetchall() return [i["annee"] for i in l]
[docs] def get_data(self, acl_wei): """Renvoie un dico contenant toutes les informations sur le compte. ``pbsante`` n'est fournis que si ``acl_wei = True``.""" data = { "idbde": self.idbde, "type": self.type, "pseudo": self.pseudo, "aliases": self.get_aliases(), "historiques": self.get_historiques(), "solde": self.solde, "nom": self.nom, "prenom": self.prenom, "is_adherent": self.is_adherent, "section": self.section, "section_year": self.section_year, "tel": self.tel, "mail": self.mail, "adresse": self.adresse, "fonction": self.fonction, "normalien": self.normalien, "droits": self.droits, "surdroits": self.surdroits, "supreme": self.supreme, "report_period": self.report_period, "previous_report_date": self.previous_report_date, "next_report_date": self.next_report_date, "bloque": self.bloque, "commentaire": self.commentaire, "annees": self.get_annees(), "deleted": self.deleted, } if acl_wei: data["pbsante"] = self.pbsante return data
[docs] def anonymise(self): """Anonymise un compte, peut être appelé lors de sa "suppression".""" # Suppression de ses alias con, cur = getcursor() cur.execute("DELETE FROM aliases WHERE idbde = %s;", (self.idbde,)) cur.execute("COMMIT;") self.nom = u"Deleted" self.prenom = u"Deleted" self.pseudo = default_pseudo("", "", startwith="deleted%s_" % (self.idbde,)) self.tel = u"" self.mail = u"nobody@crans.org" self.adresse = u"" self.fonction = u"" self.normalien = False self.droits = u"" self.surdroits = u"" self.supreme = False self.report_period = -1 self.next_report_date = u"2042-01-01 00:13:37" self.bloque = True # Oui, il y a besoin du .decode. Si on donne un unicode à strftime, il UnicodeEncodeError self.commentaire = u"Supprimé %s." % (time.strftime("le %d/%m/%Y à %T").decode("utf-8")) self.pbsante = u"" self.save()
[docs]def sql_pretty_print(liste, keys=None): """Affiche (presque) la même chose que la sortie obtenue dans un prompt psql. Si ``keys`` n'est pas fourni, l'ordre des champs ne sera pas toujours le même. Crashe si ``liste`` est vide. """ if len(liste) == 0: raise ExceptionsNote.EmptyResult("Impossible d'afficher un résultat vide.") if keys == None: keys = liste[0].keys() # On unicodifie for iligne in range(len(liste)): for k, v in liste[iligne].items(): liste[iligne][k] = (u"%s" % v) # On cherche la largeur des colonnes liste_plus_keys = liste + [{k: k for k in keys}] maxs = {champ: max([len(i[champ]) for i in liste_plus_keys]) for champ in keys} # On aligne à gauche par défaut, mais pas si c'est un nombre nombres = [int, float, long] templateline = u" "+ u" | ".join([u"%%(%s)%s%ss" % (key, u"-"*(not type(liste[0][key]) in nombres), maxs[key]) for key in keys]) + u" \n" output = templateline % {k: k for k in keys} output += (templateline % {k: "" for k in keys}).replace(" ", "-").replace("|", "+") for ligne in liste: output += templateline % ligne return output