Code source de note.utilities

#!/usr/bin/env python
# -*- encoding: utf-8 -*-

"""Fonctions secondaires pour contruire les pages.
   
   Attention, ce module est en trois parties dont voici les descriptions :
   
   Les méthodes _get
   %%%%%%%%%%%%%%%%%
   
   Les méthodes pour faire des requêtes au serveur NK2015
   
   La convention est la suivante :
    - en cas de réussite, on renvoie l'objet demandé
    - en cas d'échec, on raise une erreur :py:class:`note.basic.NotFound` avec en attribut ``.fallback_with``
      l'adresse de redirection
   
   La gestion des photos
   %%%%%%%%%%%%%%%%%%%%%
   
   Quelques fonctions pour stocker et fournir les photos
   
   La gestion commune à toutes les pages
   %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
   
   Dans un soucis de factorisation de code, tout ce qui a besoin d'être fait sur toutes les pages
   (à part login et logout) est dans ces fonctions.
   
   Documentation successive des trois parties
   %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
   
   """

import json
import os
import shutil
import base64
import functools

# imports from Django
from django.http import HttpResponse, HttpResponseRedirect
from django.shortcuts import render

# Les paramètres django
import settings

# Les messages
import messages

# La communication avec le backend
import nk

import basic
# On importe dans le namespace courant les erreurs levées usuellement par les _get_qqc
from basic import NotFound, IllegalId

[docs]def _get_activites(sock, isadmin, isold, request, fallback='%sindex/' % (settings.NOTE_ROOT_URL,), computecandelete=False, whoami=None, mine=False): """Récupère la liste des activités""" if mine: flags = "m" elif isold: flags = "o" * isadmin else: flags = "A" * isadmin sock.write(json.dumps(["get_activites", ["", flags]])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): liste_activites = out["msg"] for i in range(len(liste_activites)): # on post-processe le champ "invitable" liste_activites[i]["caninvite"], liste_activites[i]["fails"] = liste_activites[i]["invitable"] if computecandelete: # On détermine si il est possible de supprimer l'activité # (indépendamment du fait qu'elle contient peut-être des invités, ça ce sera vérifié plus tard) # On peut supprimer si (on en est respo ET elle est pas validée) OU (on est admin) liste_activites[i]["candelete"] = isadmin or (liste_activites[i]["responsable"] == whoami["idbde"] and liste_activites[i]["validepar"] is None) return liste_activites else: messages.add_error(request, out["errmsg"]) raise NotFound(fallback)
[docs]def _get_activite(sock, idact, request, fallback='%sactivites/' % (settings.NOTE_ROOT_URL,), computecandelete=False, whoami=None, isadmin=False): """Récupère une activité""" sock.write(json.dumps(["get_activite", idact])) out = nk.full_read(sock) if out["retcode"] == 404: messages.add_error(request, messages.ERRMSG_IDACT_FAIL % (idact)) raise NotFound(fallback) elif not nk._is_success_code(out["retcode"]): messages.add_error(request, out["errmsg"]) raise NotFound(fallback) else: activite = out["msg"] if computecandelete: # On détermine si il est possible de supprimer l'activité # (indépendamment du fait qu'elle contient peut-être des invités, ça ce sera vérifié plus tard) # On peut supprimer si (on en est respo ET elle est pas validée) OU (on est admin) activite["candelete"] = isadmin or (activite["responsable"] == whoami["idbde"] and activite[i]["validepar"] is None) return out["msg"]
[docs]def _get_invites(sock, idact, isadmin, request, fallback='%sactivites/' % (settings.NOTE_ROOT_URL,)): """Récupère la liste des invités d'une activité. Il ne peut pas y avoir d'échec 404, au pire la liste est vide.""" flags = "A" * isadmin data = [idact, flags] sock.write(json.dumps(["get_invites", data])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): return out["msg"] else: messages.add_error(request, out["errmsg"]) raise NotFound(fallback)
[docs]def _get_variable_droits(sock, request, fallback='%sindex/' % (settings.NOTE_ROOT_URL)): """Récupère les droits existants renvoie une paire ``(<liste de (<alias qui le donne>, <droit>), <liste de droits sans alias>)``""" sock.write(json.dumps(["mayi", "droits"])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): # il faut organiser un peu tout ça : dicodroits = out["msg"] keys = dicodroits["_keys"] noalias = dicodroits["_noalias"] del dicodroits["_keys"] del dicodroits["_noalias"] droits = [(key, dicodroits[key]) for key in keys] return (droits, noalias) else: messages.add_error(request, out["errmsg"]) raise NotFound(fallback)
[docs]def _get_compte(sock, idbde, request, fallback='%scomptes/' % (settings.NOTE_ROOT_URL,)): """Récupère un compte""" sock.write(json.dumps(["compte", idbde])) out = nk.full_read(sock) if out["retcode"] == 404: messages.add_error(request, messages.ERRMSG_IDBDE_FAIL % (idbde)) raise NotFound(fallback) elif not nk._is_success_code(out["retcode"]): messages.add_error(request, out["errmsg"]) raise NotFound(fallback) else: compte = out["msg"] # Post-processing for field in ["droits", "surdroits"]: compte[field] = compte[field].split(",") # ATTENTION : u''.split(",") == [u''] c'est débile, mais c'est comme ça if compte[field] == [u'']: compte[field] = [] return compte
[docs]def _get_historique_pseudo(sock, idbde, request, fallback='%sindex/' % (settings.NOTE_ROOT_URL,)): """Récupère un compte et ses anciens pseudos. Place l'historique des pseudos dans la clé ``"historique_pseudo"`` du résultat.""" compte = _get_compte(sock, idbde, request, fallback=fallback) sock.write(json.dumps(["historique_pseudo", idbde])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): histos = out["msg"] compte["historique_pseudo"] = histos return compte else: messages.add_error(request, out["errmsg"]) raise NotFound(fallback)
[docs]def _get_aliases(sock, idbde, request, fallback='%sindex/' % (settings.NOTE_ROOT_URL,)): """Récupère un compte et post-processe ses alias""" # Une exception NotFound peut être levée par _get_compte compte = _get_compte(sock, idbde, request, fallback=fallback) compte["aliases"] = [{"id": alias[0], "alias": alias[1]} for alias in compte["aliases"]] return compte
def _get_boutons_categories(sock, request, all, fallback='%sindex/' % (settings.NOTE_ROOT_URL,)): sock.write(json.dumps(["get_boutons_categories", all,])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): return out["msg"] else: messages.add_error(request, out["errmsg"]) raise NotFound(fallback)
[docs]def _get_boutons(sock, request, hidden=False, fallback='%sindex/' % (settings.NOTE_ROOT_URL,)): """Récupère tous les boutons et les ordonne par (catégorie, label)""" options = ["", ""] if hidden: options.append("all") to_send = ["get_boutons", options] sock.write(json.dumps(to_send)) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): boutons = out["msg"] boutons.sort(lambda x, y: cmp((x["categorie"].lower(), x["label"].lower()), (y["categorie"].lower(), y["label"].lower()))) return boutons else: messages.add_error(request, out["errmsg"]) raise NotFound(fallback)
[docs]def _get_un_bouton(sock, idbouton, request, fallback='%sindex/' % (settings.NOTE_ROOT_URL,)): """Récupère un bouton""" sock.write(json.dumps(["get_un_bouton", idbouton])) out = nk.full_read(sock) if out["retcode"] == 404: messages.add_error(request, messages.ERRMSG_IDBUTTON_FAIL % (idbouton)) raise NotFound(fallback) elif not nk._is_success_code(out["retcode"]): messages.add_error(request, out["errmsg"]) raise NotFound(fallback) else: return out["msg"]
[docs]def _get_clubs(sock, request, fallback='%sindex/' % (settings.NOTE_ROOT_URL,)): """Récupère la liste des (idbde, pseudo) des clubs""" sock.write(json.dumps(["get_clubs"])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): return out["msg"] else: messages.add_error(request, out["errmsg"]) raise NotFound(fallback)
[docs]def _get_preinscription(sock, preid, request, fallback='%sinscriptions/' % (settings.NOTE_ROOT_URL,)): """Récupère une préinscription""" sock.write(json.dumps(["get_preinscription", preid])) out = nk.full_read(sock) if out["retcode"] == 404: messages.add_error(request, messages.ERRMSG_PREID_FAIL % (preid)) raise NotFound(fallback) elif not nk._is_success_code(out["retcode"]): messages.add_error(request, out["errmsg"]) raise NotFound(fallback) else: return out["msg"]
[docs]def _get_preinscriptions(sock, request, fallback='%sindex' % (settings.NOTE_ROOT_URL,)): """Récupère la liste des préinscriptions""" sock.write(json.dumps(["get_preinscriptions"])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): return out["msg"] else: raise NotFound(fallback)
[docs]def _get_wei(sock, idwei, request, fallback='%sindex' % (settings.NOTE_ROOT_URL,)): """Récupère les informations d'une préinscription wei. """ sock.write(json.dumps(["wei_get_info", idwei])) out = nk.full_read(sock) if not nk._is_success_code(out["retcode"]): message.add_error(request, out["errmsg"]) raise NotFound(fallback) else: return out["msg"]
[docs]def _get_full_rights(sock): """Récupère la liste exhaustive des droits et surdroits du compte (alias déréférencés).""" sock.write(json.dumps(["mayi", "full_rights"])) out = nk.full_read(sock) return out["msg"]["droits"], out["msg"]["surdroits"]
[docs]def _get_tarifs_adhesion(sock): """Récupère les tarifs d'adhésion wei/pas wei normalien/pas normalien.""" sock.write(json.dumps(["get_tarifs_adhesion"])) tarifs = nk.full_read(sock)["msg"] return tarifs
[docs]def _prepare_variables(sock, idbde, request, form=False, whoami=None): """Prépare plein de variables pour l'affichage/la modification des comptes. (Le post-processing n'est pas forcément le même) Renvoie un dico des variables récupérées""" # Une exception NotFound peut être levée par _get_compte compte = _get_compte(sock, idbde, request) vars = {} # On rend quelques champs un peu plus présentables # les listes compte["historiques"] = ", ".join(compte["historiques"]) compte["aliases"] = ", ".join([al[1] for al in compte["aliases"]]) compte["annees"] = ", ".join([str(i) for i in compte["annees"]]) vars["compte"] = compte # pour les infos qui ne sont accessibles qu'aux droits wei vars["has_wei"] = compte.has_key("pbsante") # on aimerait connaître les droits qui existent vars["droits"], vars["droits_noalias"] = _get_variable_droits(sock, request) if form: # on a aussi besoin de connaître la liste exhaustive des droits du compte courant full_rights, full_overrights = _get_full_rights(sock) whoami["full_overrights"] = full_overrights whoami["full_rights"] = full_rights vars["whoami"] = whoami else: # on fournit la photo _provide_photo(sock, idbde) vars["urlphoto"] = _get_url_photo(idbde) return vars
[docs]def _del_activite(sock, request, idact): """Supprime une activité (le Redirect en cas d'échec est à la charge de l'appelant)""" sock.write(json.dumps(["del_activite", [idact, "A"]])) # pareil, on demande toujours l'admin au cas où out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): messages.add_success(request, messages.SUCCMSG_DELACT) return True else: messages.add_error(request, out["errmsg"]) return False
[docs]def _get_remise(sock): """Récupère la liste des ``idremise`` des remises non closes""" sock.write(json.dumps(["remises_open"])) out = nk.full_read(sock) ids = [] for id in out["msg"]: ids.append(id["id"]) if True : #TODO nk.is_success_code(out["retcode"]): #TODO ajouter le messages.add_success return ids else: #TODO ajouter le messages.add_error(request, out["errmsg"] return None
################################################################# ## Gestion des photos ## #################################################################
[docs]def _get_photoname(idbde): """Donne le nom de fichier de la photo n°``idbde``""" return "%s.png" % (idbde)
[docs]def _get_photopath(idbde): """Donne le chemin de la photo n°``idbde``""" return settings.PHOTOS_PATH + _get_photoname(idbde)
[docs]def _get_url_photo(idbde): """Donne l'url d'accès à la photo n°``idbde``""" return settings.PHOTOS_URL + _get_photoname(idbde)
[docs]def _store_photo(photodata, idbde): """Stocke la photo n°``idbde`` dans un fichier""" photopath = _get_photopath(idbde) f = open(photopath, "w") f.write(photodata) f.close()
[docs]def _provide_photo(sock, idbde): """Fait en sorte que la photo n°``idbde`` soit disponible à son url.""" # Il est possible que quelqu'un ait déjà demandé à voir la photo (voire, moi-même), # auquel cas, le fichier est déjà présent et donc on s'en sert, # sinon, il faut le demander au serveur NK2015 photopath = _get_photopath(idbde) ask = True load_unknown = False # A priori on ne charge pas la photo inconnue # On a besoin de demander la photo au serveur, sauf si on la possède… if os.path.isfile(photopath): sock.write(json.dumps(["get_last_modified_photo", idbde])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): remote_timestamp = out["msg"] owned_timestamp = os.path.getmtime(photopath) if owned_timestamp >= remote_timestamp: # …et qu'elle est à jour ask = False else: load_unknown = True if ask and not load_unknown: sock.write(json.dumps(["get_photo", idbde])) out = nk.full_read(sock) if nk._is_success_code(out["retcode"]): photob64 = out["msg"] photo = base64.b64decode(photob64) # pour pouvoir servir la photo au client HTTP il faut l'enregistrer quelque part _store_photo(photo, idbde) else: load_unknown = True if load_unknown: # On n'a pas réussi à charger la photo, on met donc celle par défaut shutil.copy(settings.MEDIA_ROOT + "photo_unknown.png", photopath)
[docs]def _display_versions(): """Renvoie un string html-formaté des version de nginx et django.""" return "<address> %s <br/> Django/%s </address>\n" % basic._get_versions()
################################################################# ## Gestion commmune à presque toutes les pages ## #################################################################
[docs]def get_varsock(request, socket=False): """Fonction appelée pour générer quasiment toutes les pages (pas login). Elle récupère la socket dans :py:mod:`keep_alive` et vérifie que la session NK2015 n'a pas timeout. Renvoie ``(bool, sock_ou_response, variables_standard)`` avec les cas suivants : * ``(True, <une socket ouverte vers le serveur NK2015>, <un dico de variables de bases>)`` * ``(False, <un Http object utilisable>, {})`` * ``(True, None, <un dico de variables de base>)`` (dans le cas ``socket=False``) """ if request.session.get("logged", "no") == "ok": # Le login a réussi whoami = request.session["whoami"] variables_standard = basic._fundamental_variables() variables_standard["pages"] = request.session["pages"] variables_standard["whoami"] = whoami if socket: success, sock_ou_response = nk.socket_still_alive(request) if success: return (True, sock_ou_response, variables_standard) else: return (False, sock_ou_response, {}) else: return (True, None, variables_standard) else: # Le cookie Django a expiré ou est invalide messages.add_error(request, messages.ERRMSG_DJANGO_SESSION_EXPIRED) return (False, HttpResponseRedirect(nk._gen_redirect_postlogin(request)), {})
#: Correspondance (type d'id à convertir) -> (template message d'erreur en cas d'échec, url de fallback par défaut) #: #: La valeur incorrecte de l'id sera string-formattée dans le message d'erreur #: Le premier ``"%s"`` de l'url de fallback sera remplacée par :py:data:`settings.NOTE_ROOT_URL` #: #: Pour le cas particulier d'``"idinv"``, voir la documentation de :py:meth:`_cast_as` CAST_DICT = {"idact" : (messages.ERRMSG_IDACT_INVALID, '%sactivites/'), "idbde" : (messages.ERRMSG_IDBDE_INVALID, '%scomptes/'), "idinv" : (messages.ERRMSG_IDINV_INVALID, '%sactivites/%s%s/'), ## !! ## "idbouton" : (messages.ERRMSG_IDBUTTON_INVALID, '%sboutons/'), "preid" : (messages.ERRMSG_PREID_INVALID, '%sinscriptions/'), "idtransaction" : (messages.ERRMSG_IDTRANSACTION_INVALID, '%sconsos/'), "idremise" : (messages.ERRMSG_IDREMISE_INVALID, '%stresorerie/remises/'), }
[docs]def _cast_as(cast_as_what, id_to_cast, request, kwargs): """Pour transformer un id en entier et enregistrer une erreur dans la session en cas d'échec. * en cas de réussite, renvoie l'id * en cas d'échec, lève une erreur :py:class:`note.basic.IllegalId` qui contient dans son ``.fallback_with`` l'URL de redirection. Utilise :py:data:`note.views.CAST_DICT` pour connaître le message d'erreur à éventuellement afficher et l'url de redirection. * On fallback sur l'url du dico en remplaçant le premier ``"%s"`` par la valeur de :py:data:`settings.NOTE_ROOT_URL` * Dans le cas particulier de ``"idinv"`` : * Le deuxième ``"%s"`` est remplacé par ``kwargs["idact"]`` * Le troisième ``"%s"`` est remplacé par ``"/admin"`` si ``kwargs["admin"]`` est ``"/admin"`` """ errmsg, fallbackurl = CAST_DICT[cast_as_what] if kwargs.has_key("fallback"): fallbackurl = kwargs["fallback"] elif cast_as_what == "idinv": # Cas particulier de fallback pour les id d'invités fallbackurl = fallbackurl % (settings.NOTE_ROOT_URL, kwargs["idact"], "/admin" if kwargs.get("admin") else "") else: fallbackurl = fallbackurl % (settings.NOTE_ROOT_URL,) try: return int(id_to_cast) except: messages.add_error(request, errmsg % (id_to_cast)) raise IllegalId(fallbackurl)
[docs]def standard_page_withignores(ignore_casts=[]): """Décorateur pour la quasi-totalité des pages. S'occupe de caster les ids fournis dans l'url, récupère la socket et peuple les variables standard. Si l'id est précisé dans ``ignore_casts``, on n'affiche pas d'erreur même si il est vide. (on y met ``None``) On reçoit dans ``kwargs`` d'éventuels ids à caster. Lors du cast des ids ou la récupération de la socket, il peut y avoir une erreur. La fonction n'est alors pas appelée et une réponse d'erreur est envoyée. On appelle ensuite la fonction de la page ainsi ``func(request, sock, kwargs)``. * ``sock`` est une socket de communication vers le serveur NK2015. * On a ajouté à ``kwargs`` une clé ``"variables_standard"``, qui contient quelques valeurs utiles. Ce n'est nécessaire que pour peu de fonctions, ce qui explique que ce ne soit pas un argument explicite. * Si une erreur :py:class:`note.basic.NotFound` ou :py:class:`note.basic.IllegalId` est levée, elle est rattrappée et on redirige vers son ``.fallback_with`` """ def real_decorator(func): """On a besoin de faire du nesting pour que le décorateur puisse avoir un paramètre""" # On crée la fonction telle qu'elle sera réellement exécutée # En la wrappant pour conserver la docstring et le nom de la fonction @functools.wraps(func) def _decorated(request, *args, **kwargs): """On fait des choses avant d'appeler ``func``.""" # On caste les ids fournis en keyword argument, si il y en a for keyword in kwargs.keys(): if keyword in CAST_DICT.keys(): # Dans le cas où l'id n'est pas fournit, _cast_as va râler. # Mais dans certains cas, on ne veut pas que ça râle if kwargs[keyword] == "" and keyword in ignore_casts: kwargs[keyword] = None else: # On fournit ``kwargs`` à :py:meth:`_cast_as` parce qu'elle en a besoin pour le cas "idinv" try: kwargs[keyword] = _cast_as(keyword, kwargs[keyword], request, kwargs) except IllegalId as e: # Un _cast_as a échoué, on renvoie un redirect vers l'URL de fallback return HttpResponseRedirect(e.fallback_with) # On appelle la fonction standard, préalable à l'exécution de chaque page success, sock_ou_response, variables_standard = get_varsock(request, socket=True) if success: sock = sock_ou_response kwargs["variables_standard"] = variables_standard try: result = func(request, sock, kwargs) except NotFound as e: # Un _get_quelquechose a échoué, on renvoie un redirect vers l'URL de fallback return HttpResponseRedirect(e.fallback_with) if isinstance(result, HttpResponse): return result variables, template_name = result variables.update(variables_standard) response = render(request, template_name, variables) else: response = sock_ou_response return response return _decorated return real_decorator
#: Comme :py:meth:`standard_page_withignores` mais, justement sans ignores, #: parce que la plupart des pages n'en ont pas besoin. #: (C'est en fait le résultat de ``standard_page_withignores([])``) standard_page = standard_page_withignores() ############################################################################### ### Fonctions utilitaires pour l'application WEI ### ###############################################################################
[docs]def _get_dept_code(dept): """ Fonction transformant une chaine de caractères désignant un département d'enseignement en code, par exemple "Physique" -> "A2". Renvoie 'EXT' si le département est inconnu. """ if dept.lower() in [ 'info', 'informatique', 'a0', ]: code = 'A0' elif dept.lower() in [ 'math', 'maths', 'mathématiques', 'mathematiques', ]: code = 'A1' elif dept.lower() in [ 'physique', 'phys', 'a2', ]: code = 'A2' elif dept.lower() in [ 'phys. appliquée', 'physique appliquée', 'phys. appliquee', 'physique appliquee', 'phys appliquée', 'phys appliquee', ]: code = 'A\'2' elif dept.lower() in [ 'chimie', 'a"2', ]: code = 'A"2' elif dept.lower() in [ 'bio', 'biologie', 'a3', ]: code = 'A3' elif dept.lower() in [ 'saphire', 'b123', 'b1234', ]: code = 'B1234' elif dept.lower() in [ 'méca', 'mécanique', 'meca', 'mecanique', 'b1' ]: code = 'B1' elif dept.lower() in [ 'génie civil', 'genie civil', 'b2', ]: code = 'B2' elif dept.lower() in [ 'génie mécanique', 'genie mecanique', 'génie mecanique', 'genie mécanique', 'genie méca', 'génie méca', 'genie meca', 'genie méca', 'b3', ]: code = 'B3' elif dept.lower() in [ 'eea', 'electronique', 'genie electrique', 'génie éléctrique', 'b4', ]: code = 'B4' elif dept.lower() in [ 'design', 'c', ]: code = 'C' elif dept.lower() in [ 'eco-gestion', 'ecogestion', 'eco', 'gestion', 'd2', ]: code = 'D2' elif dept.lower() in [ 'sciences sociales', 'socio', 'd3', ]: code = 'D3' elif dept.lower() in [ 'anglais', 'anglaise', 'e', ]: code = 'E' else: code = 'EXT' return code