Source code for powersimdata.network.zenodo

import hashlib
import json
import os
import shutil
from contextlib import contextmanager
from zipfile import ZipFile

import requests
from tqdm import tqdm

url = "https://zenodo.org/api/records/"


[docs]class Zenodo: """Get data from a Zenodo archive. :param str record_id: zenodo record id """ def __init__(self, record_id): """Constructor""" self.record_id = record_id self.content = self._get_record_content() def _get_record_content(self): """Make HTTP request to zenodo API and retrieve content. :return: (*dict*) -- content of the response in unicode. :raises Exception: if connection times out. :raises ValueError: if record is invalid. """ try: r = requests.get(url + self.record_id, timeout=10) except requests.exceptions.ConnectTimeout: raise ConnectionError("Connection to zenodo.org timed out") if not r.ok: raise ValueError(f"Record could not be accessed. Status: {r.status_code}") content = json.loads(r.text) metadata = content["metadata"] self.version = metadata["version"] print(f"Title: {metadata['title']}") print(f"Publication date: {metadata['publication_date']}") print(f"Version: {self.version}") print(f"DOI: {metadata['doi']}") return content def _get_remote_checksum(self, f): """Get checksum of local copy of a file :param dict f: dictionary containing information on the remote copy of a file. :return: (*str*) -- checksum """ return f["checksum"].split(":")[1] def _get_local_checksum(self, f): """Get remote copy of a file. :param dict f: dictionary containing information on the local copy of a file. :return: (*str*) -- checksum if file exists """ filename = os.path.join(self.dir, f["key"]) if not os.path.exists(filename): return "invalid" else: h = hashlib.new(f["checksum"].split(":")[0]) with open(filename, "rb") as file: bytes = file.read() h.update(bytes) return h.hexdigest() @contextmanager def _change_dir(self): work_dir = os.getcwd() os.chdir(os.path.expanduser(self.dir)) try: yield finally: os.chdir(work_dir) def _download_data(self, f): """Fetch data. :param dict f: information on the file to download. """ with requests.get(f["links"]["self"], stream=True) as r: r.raise_for_status() with open(f["key"], "wb") as file: with tqdm( unit="B", unit_scale=True, unit_divisor=1024, miniters=1, total=f["size"], ) as pbar: for chunk in r.iter_content(chunk_size=8192): file.write(chunk) pbar.update(len(chunk)) def _delete_data(self, f): """Delete data. :param dict f: information on the file to delete. """ os.remove(f["key"]) if f["type"] == "zip": shutil.rmtree(f["key"][:-4]) def _unzip_data(self, f): """Unzip data. :param dict f: information on the file to unzip. """ if f["type"] == "zip": with ZipFile(f["key"], "r") as file: file.extractall()
[docs] def load_data(self, model_dir): """Download file(s) :param str model_dir: path to directory of the grid model. :raises FileNotFoundError: if ``model_dir`` does not exist. """ if not os.path.isdir(model_dir): raise FileNotFoundError(f"{model_dir} does not exist") else: version = self.content["metadata"]["version"] self.dir = os.path.join(model_dir, f"data_{version}") try: os.mkdir(self.dir) for f in self.content["files"]: with self._change_dir(): self._download_data(f) self._unzip_data(f) except FileExistsError: for f in self.content["files"]: if self._get_local_checksum(f) != self._get_remote_checksum(f): with self._change_dir(): self._delete_data(f) self._download_data(f) self._unzip_data(f) else: print(f"{f['key']} has been downloaded previously")