#! /usr/bin/env python # -*- coding: Latin1 -*- import BaseHTTPServer import cgi import math import os import re import stat # der Server lauscht auf diesem Port PORT = 9000 # Plattenplatz wird auf Einheiten so vieler Bytes aufgerundet BLOCKSIZE = 1024 # zeige zusätzliche Ebenen bis zu dieser Tiefe an; 1 steht für # die unmittelbaren Unterverzeichnisse des durch den URL # ausgewählten Verzeichnisses DEPTH = 1 class PathError(Exception):     pass class Path:     def __init__(self, path):         # mache den Pfad absolut und entferne ggf. einen / am Ende         path = os.path.abspath(path)         # Fehlerprüfung         if not os.path.exists(path):             raise PathError("Pfad %s nicht gefunden" % path)         if not os.path.isdir(path):             raise PathError("Pfad %s ist kein Verzeichnis" % path)         stat_result = os.stat(path)         self._device = stat_result.st_dev         self._path = path         # Schlüssel sind Verzeichnispfade, Werte sind Gesamt-Bytes         # (einschließlich Unterverzeichnisse)         self._usage_cache = {}         # speichere Pfade mit Hardlinks hier         self._hardlinked_paths = {}     def _is_root_dir(self, path):         # sollte auch für "DOS-artige" Betriebssysteme (mit Laufwerks-         # buchstaben) funktionieren         return os.path.splitdrive(path)[1] == os.sep     def _grouped_int(self, number):         """         Gib einen String zurück, bei dem der Integer-Wert 'number', der die Dateigröße darstellt, zur besseren Übersicht in         Dreiergruppen unterteilt ist.         Beispiel: Aus 1234567 wird 1 234 567.         """         grouped_int = str(number)         # fülle von links mit Leerzeichen auf, so dass die Länge des         # Strings durch Drei teilbar wird         while len(grouped_int) % 3 != 0:             grouped_int = " " + grouped_int         groups = []         for index in range(0, len(grouped_int), 3):             groups.append(grouped_int[index:index+3])         return " ".join(groups).lstrip()     def _filter(self, dir_path, dirnames, filenames):         """         Verändere 'dirname' und 'filenames' direkt, so dass weder         Symlinks verfolgt, noch Mountpoints überquert werden.         """         # keine Liste ändern, über die wir iterieren         for dirname in dirnames[:]:             absolute_path = os.path.join(dir_path, dirname)             if os.path.islink(absolute_path) or \               os.lstat(absolute_path).st_dev != self._device:                 # merke den Namen, aber ignoriere das Verzeichnis                 self._usage_cache[absolute_path] = 0                 dirnames.remove(dirname)         # keine Liste ändern, über die wir iterieren         for filename in filenames[:]:             absolute_path = os.path.join(dir_path, filename)             if os.path.islink(absolute_path):                 filenames.remove(filename)     def disk_usage(self):         """         Gib den belegten Plattenplatz für diesen Pfad in Form eines         Dictionaries zurück. Die Schlüssel sind absolute Pfade, die         Werte sind Größen in Bytes.         Symlinks werden nicht verfolgt. Falls ein Pfad mehrere         Hardlinks besitzt, zähle den belegten Platz nur einmal.         """         for dir_path, dirnames, filenames in os.walk(self._path):             # filtere in-place, um Symlinks und Verzeichnisse/Dateien E             #  auf anderen Mountpoints auszuschließen             self._filter(dir_path, dirnames, filenames)             bytes_in_dir_path = 0             for path in (dirnames + filenames):                 absolute_path = os.path.join(dir_path, path)                 stat_result = os.lstat(absolute_path)                 # zähle Pfade mit mehreren Hardlinks nur einmal                 if stat_result.st_nlink > 1:                     if stat_result.st_ino in self._hardlinked_paths:                         # diesen Pfad hatten wir schon; ignoriere ihn                         continue                     else:                         # speichere den Pfad für spätere Kontrollen                         self._hardlinked_paths[stat_result.st_ino] = True                 bytes = stat_result.st_size                 rounded_bytes = int(math.ceil(float(bytes)/BLOCKSIZE) *                                     BLOCKSIZE)                 bytes_in_dir_path += rounded_bytes             self._usage_cache[dir_path] = bytes_in_dir_path             # füge den berechneten Wert für 'bytes_in_dir_path' allen             #  Verzeichnissen "oberhalb" von diesem hinzu             while not self._is_root_dir(dir_path):                 dir_path = os.path.dirname(dir_path)                 try:                     self._usage_cache[dir_path] += bytes_in_dir_path                 except KeyError:                     pass         return self._usage_cache     def disk_usage_html(self):         """Gib eine HTML-Tabelle für den belegten Platz zurück."""         disk_usage = self.disk_usage()         starting_depth = len(self._path.split(os.sep))         escaped_path = cgi.escape(self._path)         html_parts = [           'Plattenplatz für %s' %             escaped_path,           '',           '

Plattenplatz für %s

' % escaped_path,           '',           'PfadGröße in KB']         paths = disk_usage.keys()         paths.sort()         for path in paths:             depth = len(path.split(os.sep))             if depth - starting_depth > DEPTH:                 continue             escaped_path = cgi.escape(path)             size_in_kb = int(round(disk_usage[path] / 1024.))             html_parts.append(               '%s%s' %               (escaped_path, escaped_path, self._grouped_int(size_in_kb)))         html_parts.append('')         return "\n".join(html_parts) class DiskUsageHandler(BaseHTTPServer.BaseHTTPRequestHandler):     def _translate_path(self, path):         """Gib den Pfad mit decodierten %hh-Sequenzen zurück."""         # für Nicht-Posix-Betriebssysteme         path = path.replace('/', os.sep)         replace_function = lambda match: chr(int(match.group(0)[1:], 16))         return re.sub(r"%[A-Fa-f\d]{2}", replace_function, path)     def send_content(self, content, content_type="text/plain", status=200):         self.send_response(status)         self.send_header("Content-type", content_type)         self.send_header("Content-length", str(len(content)))         self.end_headers()         self.wfile.write(content)     def do_GET(self):         self.path = self._translate_path(self.path)         try:             output = Path(self.path).disk_usage_html()         except PathError, path_error:             self.send_content(str(path_error), status=400)         else:             self.send_content(output, content_type="text/html") def main():     server = BaseHTTPServer.HTTPServer(('', PORT), DiskUsageHandler)     server.serve_forever() if __name__ == '__main__':     try:         main()     except KeyboardInterrupt:         print "\nBeenden ..."