import json from gzip import GzipFile from os.path import basename, join from urllib.error import URLError from urllib.parse import urlparse from urllib.request import Request, urlopen class UrlReader(object): def __init__(self, _url): """ Erzeugt einen neuen UrlReader für gegebene url :param _url: zu öffnende URL """ self.url = _url def open_url(self): """ Öffnet eine URL-Verbindung, fragt GZIP-Kompression an und gibt das Response-Objekt zurück :return: Response-Objekt oder None im Fehlerfall """ print("open_url: url: \"%s\"" % (self.url,)) try: # Versuche Verbindung zu öffnen request = Request(self.url) request.add_header('Accept-Encoding', 'gzip') response = urlopen(request) print("open_url: Verbindung hergestellt") return response except URLError as e: # auch HTTPError print("open_url: FEHLER: " + str(e)) return None def get_data_response(self): """ Benutzt open_url und gibt die (entpackten) Daten zurück. :return: (entpackte) Daten oder None im Fehlerfall """ print("get_data_response: url: \"%s\"" % (self.url,)) response = self.open_url() if response is None: #FEHLER: Kein Response-Objekt erhalten print("get_data_response: FEHLER: Kein Response-Objekt erhalten") return None try: if response.headers['Content-Encoding'] == 'gzip': # Empfange GZIP Daten... print("get_data_response: Empfange GZIP Daten...") daten = GzipFile(fileobj=response).read() else: # Empfange unkomprimierte Daten... print("get_data_response: Empfange unkomprimierte Daten...") daten = response.read() print("get_data_response: Daten empfangen") return daten except OSError as e: print("get_data_response: FEHLER: " + str(e)) return None def get_json_response(self): """ Benutzt get_data_response zum Herunterladen, interpretiert die Daten als JSON und gibt das Ergebnis zurück. :return: Geparste JSON Daten oder None im Fehlerfall """ print("get_json_response: url=" + self.url) daten = self.get_data_response() if daten is None: # FEHLER: Keine Daten erhalten print("get_json_response: FEHLER: Keine Daten erhalten") return None try: # Versuche JSON zu lesen print("get_json_response: Lese JSON...") parsed = json.loads(daten) print("get_json_response: JSON gelesen") return parsed except ValueError as e: # JSONDecodeError print("get_json_response: ValueError: " + str(e)) except TypeError as e: # JSONDecodeError print("get_json_response: TypeError: " + str(e)) return None # Kein Erfolg def _dateiname_von_url(self): """ Extrahiert den Dateinamen aus einer der URL """ result = urlparse(self.url) dateiname = basename(result.path) return dateiname def get_file_response(self, pfad): """ Benutzt get_data_response zum Herunterladen, schreibt die Daten in eine Datei und gibt ihren Pfad zurück (gegebenes Verzeichnis + basename des URL-Pfades). :param pfad: Verzeichnis in dem die Datei gespeichert werden soll. :return: Pfad der erzeugten Datei oder None im Fehlerfall """ print("get_file_response: url: \"%s\"" % (self.url,)) print("get_file_response: pfad: \"%s\"" % (pfad,)) daten = self.get_data_response() if daten is None: print("get_file_response: FEHLER: Keine Daten erhalten") return None dateiname = self._dateiname_von_url() print("get_file_response: dateiname: \"%s\"" % (dateiname,)) dateipfad = join(pfad, dateiname) print("get_file_response: dateipfad: \"%s\"" % (dateipfad,)) try: print("get_file_response: Schreibe Datei...") with open(dateipfad, 'wb') as datei: datei.write(daten) print("get_file_response: Datei geschrieben") return dateipfad except OSError as e: print("getFilesResponse: FEHLER: " + str(e)) return None if __name__ == '__main__': url = "https://ia800302.us.archive.org/8/items/BennyGoodmanQuartetAndTrio/BodySoul-BennyGoodmanGeneKrupaTeddyWilsoncarnegieHall1938_64kb.mp3" print(UrlReader(url).get_file_response(""))