Postez ici vos trucs & astuces.
Répondre

Outil de surveillance utilisateur ?

#1Messageil y a 3 semaines

Bonjour à tous,
Enseignant et référent numérique d'un Lycée pro, je déploie 44 postes portables sous la forme de classes mobiles (dans des chariots). Les PC choisis sont des Asus pour pouvoir se faire facilement rembourser la licence Windows. Manjaro est installé sur ces 44 PC.

Deux classes mobiles sont déjà en place depuis 2 ans mais je galère à chaque fois qu'il est nécessaire de fouiller dans les logs. Comme je dois mettre en service une troisième classe mobile, j'en profite pour repartir d'une image système propre et je m'interrogeais sur la possibilité de trouver un utilitaire permettant d'avoir un accès plus facile et plus lisible aux logs systèmes, tout particulièrement ceux liés à l'utilisateur.

Toutes les recherches Google que j'ai pu faire n'ont mené à rien d'intéressant (Timepskr, CTparental...).
Il me faudrait simplement un petit outil capable de filtrer les logs système d'un utilisateur en local, en cas de problème.

Une idée ? :gsourire:

Merci !

PS: Quand l'image système sera terminée je la partagerai si ça peut inspirer d'autres personnes !

Outil de surveillance utilisateur ?

#2Messageil y a 3 semaines

bonjour
R26V8RS26 a écrit : il y a 3 semaines utilitaire permettant d'avoir un accès plus facile et plus lisible aux logs systèmes, tout particulièrement ceux liés à l'utilisateur.
Toutes les recherches Google que j'ai pu faire n'ont mené à rien d'intéressant (Timepskr, CTparental...).
simplement un petit outil capable de filtrer les logs système d'un utilisateur en local
Tu nous donnes 2 utilitaires qui n'ont pas de rapport avec les logs (pour consultation) ?

Tu parles des logs systemd ?

Si tu recherches une interface graphique, existe pour kde KSystemLog et Journaux pour gnome

En console, journalctl n'est pas particulièrement compliqué, de plus on peut générer une sortie json (globale) puis avec un petit script de notre cru, on peut facilement personnaliser la sortie.

Puisque les écoles donnent maintenant des cours python, un exemple simpliste :

import sys
import subprocess
import json
from datetime import datetime

def sh(cmd:str):
    rst = subprocess.run('SYSTEMD_COLORS=xterm '+cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if rst.returncode != 0:
        print("Error:", rst.stderr, file=sys.stderr)
        exit(2)
    return rst.stdout

cmd = f'journalctl -p3 -b0 --output-fields="__REALTIME_TIMESTAMP,PRIORITY,_HOSTNAME,SYSLOG_IDENTIFIER,MESSAGE,_EXE" -o json --no-pager'
print(cmd)

jstr = sh(cmd).replace("}\n{", '},\n{')     # recup retour de la commande journalctl

for item in json.loads('[' + jstr + ']'):
    if item['SYSLOG_IDENTIFIER'] in ["kernel", "bluetoothd"]:
        continue
    try:
        dt_object = datetime.fromtimestamp(int(item["__REALTIME_TIMESTAMP"][0:10]))
        print(f"{str(dt_object)}   {item['PRIORITY']:2} {item['SYSLOG_IDENTIFIER']:18} \33[32m{item['MESSAGE']}\33[0m")
    except KeyError as err:
        print("ERROR:", err, "\n", item, "\n", file=sys.stderr)
Autre code, mais plus complexe en python, qui essaye de "regrouper" les erreurs par services/messages d'erreur.
# but est de trouver l'erreur/service qui pollue le plus mes logs, pas ton problème mais montre qu'il est même possible de générer une synthèse des logs.

import json
import subprocess
from datetime import datetime
import sys
import os
from enum import IntEnum
import argparse
import textwrap


class Levels(IntEnum):
    emergency = 0
    alerte = 1
    critique = 2
    error = 3
    warning = 4
    notice = 5
    info = 6
    debug = 7


def theme(color: str, string: str, use: bool = True) -> str:
    if not use:
        return string
    match color:
        case 'error' | 'critique' | 'alerte' | 'emergency':
            return f"\33[31m{string}\33[0m"
        case 'warning':
            return f"\33[33m{string}\33[0m"
        case 'hi':
            return f"\33[32m{string}\33[0m"
    return string

class EdgeDecoder(json.JSONDecoder):
    TO_RM = (
        '_MACHINE_ID', '__MONOTONIC_TIMESTAMP', 'SYSLOG_TIMESTAMP', '_SYSTEMD_INVOCATION_ID',
        '_AUDIT_LOGINUID', 'USER_INVOCATION_ID', '_CAP_EFFECTIVE', '_SYSTEMD_SLICE', '_TRANSPORT',
        '_SOURCE_REALTIME_TIMESTAMP', '_AUDIT_SESSION', '_HOSTNAME', 'TID', '_SYSTEMD_CGROUP',
        'QT_CATEGORY', '_CMDLINE', 'CODE_FILE', '_EXE', '_SYSTEMD_USER_SLICE',
        'JOB_RESULT', 'CODE_FUNC', '_SYSTEMD_OWNER_UID', '_RUNTIME_SCOPE', '__CURSOR'
    )

    def __init__(self, *args, **kwargs):
        json.JSONDecoder.__init__(self, object_hook=self.object_hook, *args, **kwargs)

    def object_hook(self, dct):
        dct['__REALTIME_TIMESTAMP'] = datetime.fromtimestamp(int(dct["__REALTIME_TIMESTAMP"][0:10]))
        dct['PRIORITY'] = Levels(int(dct['PRIORITY']))
        if "MESSAGE" in dct and dct['MESSAGE']:
            dct['MESSAGE'] = dct['MESSAGE'].replace("\n", " ")
        else:
            dct['MESSAGE'] = ""
        for item in self.TO_RM:
            if dct.get(item):
                dct.pop(item)
        #print(dct)
        return dct

def group(logs, key: str, default="-") -> dict[str, int]:
    ret = {}
    for entry in logs:
        if value := entry.get(key, default):
            ret.update({value: ret.get(value, 0)+1})
    return ret

def group_messages(logs, _comm: str, level=-1) -> dict[str, int]:
    ret = {}
    for entry in logs:
        if entry.get('_COMM', '') != _comm:
            continue
        if level > -1 and entry.get('PRIORITY') != level:
            continue
        value = ' '.join(str(entry.get('MESSAGE', '')).split(maxsplit=6)[0:5])
        ret.update({value: ret.get(value, 0)+1})
    return ret

def group_days(logs, _comm: str='', level=-1) -> dict[str, int]:
    ret = {}
    for entry in logs:
        if _comm and entry.get('_COMM', '') != _comm:
            continue
        if level > -1 and entry.get('PRIORITY') != level:
            continue
        value = f"{entry.get('__REALTIME_TIMESTAMP'):%Y-%m-%d}"
        ret.update({value: ret.get(value, 0)+1})
    return ret

def max_group(datas: dict) -> tuple[str, int]:
    maxi = ('', 0)
    for k, value in datas.items():
        if value > maxi[1]:
            maxi = (k, value)
    return maxi

def get_nb_boots() -> int:
    try:
        return int(subprocess.check_output("journalctl --list-boots -q | head -1", shell=True, text=True).split(maxsplit=1)[0])
    except (subprocess.CalledProcessError, ValueError):
        return -12


parser = argparse.ArgumentParser(
    prog='journalctl group',
    description='Group entries in systemd logs'
)
parser.add_argument('-b', '--boot', help='relative boot',
    choices=[str(b) for b in range(get_nb_boots(), 1)],
    default=''
)

parser.add_argument('-p', '--priority',
    help='Message levels (emergency..debug)  -  add `.` prefix for only one level',
    choices=list(str(p) for p in range(Levels.emergency.value, Levels.debug.value+1)) + [f".{p}" for p in range(Levels.emergency.value, Levels.debug.value+1)],
    default=str(Levels.debug.value)
)
parser.add_argument('-l', '--list', action='store_true', help='Display messages')
parser.add_argument('-v', '--verbose', action='store_true')
# voir /home/patrick/workspace/go/pacman-log-go/pacman-log-go/pacman-log-go -a
parser.add_argument('-c', '--calandar', action='store_true', help='group by days')
parser.add_argument('apps', metavar='+Application',
    type=str,
    nargs= argparse.REMAINDER,
    help='App(s) list to display'
)
args = parser.parse_args()
if args.boot:    args.boot = f"-b {args.boot}"
if args.priority:
    args.priority = f"-p {args.priority[1:]}..{args.priority[1:]}" if '.' in args.priority else f"-p {args.priority}"


# On récupère le retour de la commande journalctl
# --output-fields="__REALTIME_TIMESTAMP,PRIORITY,_HOSTNAME,SYSLOG_IDENTIFIER,MESSAGE,_EXE, _COMM, _UID, USER_INIT, _SYSTEMD_USER_UNIT"
cmd = f"journalctl {args.boot} {args.priority} -o json"
print('#\n#', cmd, end="\n#\n")
output = subprocess.check_output(
    cmd,
    shell=True,
    text=True).replace('}\n{', '},\n{'
)
output = f"[{output}]"
#print(output)
# On convertit le retour en json
data = json.loads(output, cls=EdgeDecoder)


def trunc_middle(text:str, width:int, placeholder="…") -> str:
    if not text:
        return ''
    if len(text) <= width:
        return text
    while len(text) > width-len(placeholder):
        middle = len(text) // 2
        if len(text) % 2:
            middle += 1
        text = text[0:middle-1] + text[middle:]
        #print('->', text, len(text))
    middle = len(text) // 2
    return text[0:middle] + placeholder + text[middle:]


def com_unit_merge(entry: dict, width=24) -> str:
    unit = entry.get('USER_UNIT', '') if entry.get('USER_UNIT') else entry.get('_SYSTEMD_USER_UNIT')
    if unit and len(unit) > 26:
        unit = trunc_middle(unit, 24)
    unit = f"({unit})" if unit else ''
    com = f"{entry.get('_COMM', 'Kernel')} {unit}"
    return f"{com:{width}s}"


COLUMNS = os.get_terminal_size().columns
w = COLUMNS - 2 - 14 -4 - (14+26)   # ~=

if '-l' in sys.argv:
    for entry in data:
        print(
            theme(Levels(int(entry.get('PRIORITY', 7))).name, f"{entry.get('PRIORITY'):2}"),
            f"{entry.get('__REALTIME_TIMESTAMP'):%m-%d %H:%M:%S} ",
            f"{com_unit_merge(entry, 12)}",
            theme('hi', textwrap.shorten(entry.get('MESSAGE'), w, placeholder="…")),
            )
    print()

if datas := group(data, 'PRIORITY'):
    print("# Niveau : Somme des messages")
    for key in sorted(datas.keys()):
        print(f"  {theme('ho', key.name.upper()):8} ({theme(key.name, key.value)}) :", datas[key])
else:
    print('No messages found')
    exit(1)

maxi = max_group(datas)

print()
print(f"#Niveau le plus grand est {theme('hi', Levels(maxi[0]).name.upper())}({theme(maxi[0].name, maxi[0])}) avec {maxi[1]} entrées :")
print()
print(f"# Applications {Levels(maxi[0]).name.upper()}({theme(maxi[0].name, maxi[0])})")
datas = group(data, '_COMM')
maxi_comm = ''
i = 0
for key, val in sorted(datas.items(), key=lambda kv:(kv[1], kv[0]), reverse=True):
    if not maxi_comm and key != '-':
        maxi_comm = key
    print(f"{val:6} : {theme('hi', key)}")
    i += 1
    if i > 7:
        break

print()
print(f"# Services {Levels(maxi[0]).name.upper()}({theme(maxi[0].name, maxi[0])})")
datas = group(data, 'USER_UNIT', "?")
i = 0
for key, val in sorted(datas.items(), key=lambda kv:(kv[1], kv[0]), reverse=True):
    if val > 2 and key != "?":
        print(f"{val:6} : {theme('hi', key)}")
    i += 1
    if i > 7:
        break

print()

print("# Messages de ", theme('hi', maxi_comm), 'niveau ', f"{maxi[0].name.upper()}({theme(maxi[0].name, maxi[0].value)})")
datas = group_messages(data, maxi_comm, maxi[0].value)
i = 0
for key, val in sorted(datas.items(), key=lambda kv:(kv[1], kv[0]), reverse=True):
    if val > 2 and key != "?":
        print(f"{val:6} : {theme('hi', key)}")
    i += 1
    if i > 12:
        break

print()
for comm in args.apps:
    for level in Levels:
        #print(comm, level.name.upper(), '...')
        datas = group_messages(data, comm, level)
        if not datas:
            continue
        print()
        print("# Messages pour ", theme('hi', comm), 'niveau ', theme(level.name, level.value), level.name.upper())
        i = 0
        for key, val in sorted(datas.items(), key=lambda kv:(kv[1], kv[0]), reverse=True):
            if val > 0 and key != "?":
                print(f"{val:6} : {theme('hi', key)}")
            i += 1
            if i > 12:
                break

print()
if args.calandar:
    if days := group_days(data):
        if len(days) > 1:
            maxi = max(days.values())
            print("# Messages par dates ", args.priority)
            for key, val in days.items():
                pourcent = val * 100 // maxi
                val = f"{val:7}"
                print(f"{key} {theme('hi', val)} {'█'* pourcent}")

print()

if args.verbose and entry:
    print(entry)
ps: existe une lib python-systemd dans nos repos non utilisée ici qui permet de lire les logs directement

Outil de surveillance utilisateur ?

#3Messageil y a 3 semaines

Bonjour Papajoke et merci pour ta réponse rapide.
papajoke a écrit : il y a 3 semaines Tu nous donnes 2 utilitaires qui n'ont pas de rapport avec les logs (pour consultation) ?
Comme je l'ai dit, quand je fais des recherches google concernant la surveillance d'utilisateur, j'obtiens des résultats comme ceux pré-cités (ceci afin de prévenir que j'ai fait ma part de recherche, sans succès), qui ne correspondent effectivement pas à ma demande.
papajoke a écrit : il y a 3 semaines Tu parles des logs systemd ?
Probablement, à vrai dire je ne comprends plus rien depuis le passage à systemd. Du coup, je ne sais même plus où chercher les logs utilisateurs.
Je sais qu'ils existent car il m'est arrivé d'avoir à les chercher,
papajoke a écrit : il y a 3 semaines Si tu recherches une interface graphique, existe pour kde KSystemLog et Journaux pour gnome
Je suis sur KDE, j'ai donc regardé kSystemLog. Pas mal mais journald semble juste évoquer les lancements d'application (et les erreurs liées). Si une modification du système a été réalisée par un utilisateur (par exemple, modification du bureau, changement de fond d'écran etc), comment retracer ces actions ?
papajoke a écrit : il y a 3 semaines En console, journalctl n'est pas particulièrement compliqué
En effet, sur le principe la commande

journalctl _UID=1002 --since today
par exemple pourrait me suffire. Le problème c'est que je n'arrive pas à voir autre chose que les programmes lancés et quittés. J'aurais aimé pouvoir retracer toutes les actions d'un utilisateur (par exemple: dolphin a été lancé, mais qu'a-t-il fait dessus?)
Peut-être existe-t-il des logs par application?

Merci pour ton aide et pour le partage des scripts :sourire:
Répondre