123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351 |
- """Downloads data from FAOSTAT website."""
- import hashlib
- import os
- import pathlib
- import time
- import zipfile
- from datetime import datetime
- import requests
- from bs4 import BeautifulSoup
- from selenium import webdriver
- from selenium.webdriver.chrome.options import Options
- from selenium.webdriver.chrome.service import Service
- from faostat_data_primap.exceptions import DateTagNotFoundError
- from faostat_data_primap.helper.definitions import domains
- from faostat_data_primap.helper.paths import downloaded_data_path
- def find_previous_release_path(
- current_release_path: pathlib.Path,
- ) -> pathlib.Path | None:
- """
- Find the most recent previous release directory within same domain
- Release directories are assumed to be subdirectories within the same parent
- directory as `current_release_path`. The Sorting is done alphabetically,
- so directory names should follow the naming convention YYYY-MM-DD
- Parameters
- ----------
- current_release_path : pathlib.Path
- The path of the current release directory.
- Returns
- -------
- pathlib.Path or None
- Returns the path of the most recent previous release directory if one exists,
- otherwise returns None.
- """
- domain_path = current_release_path.parent
- all_releases = [
- release_name
- for release_name in os.listdir(current_release_path.parent)
- if (domain_path / release_name).is_dir()
- ]
- # make sure all directories follow the naming convention
- try:
- all_releases_datetime = [
- datetime.strptime(release, "%Y-%m-%d") for release in all_releases
- ]
- except ValueError as e:
- msg = (
- "All release folders must be in YYYY-MM-DD format, "
- f"got {sorted(all_releases)}"
- )
- raise ValueError(msg) from e
- all_releases_datetime = sorted(all_releases_datetime)
- current_release_datetime = datetime.strptime(current_release_path.name, "%Y-%m-%d")
- index = all_releases_datetime.index(current_release_datetime)
- # if the current release is the latest or the only one
- if index == 0:
- return None
- return domain_path / all_releases_datetime[index - 1].strftime("%Y-%m-%d")
- def calculate_checksum(file_path: pathlib.Path) -> str:
- """
- Calculate the SHA-256 checksum of a file.
- Parameters
- ----------
- file_path : pathlib.Path
- The path to the file for which the checksum is calculated.
- Returns
- -------
- str
- The SHA-256 checksum of the file as a hexadecimal string.
- """
- sha256 = hashlib.sha256()
- with open(file_path, "rb") as f:
- for chunk in iter(lambda: f.read(4096), b""):
- sha256.update(chunk)
- return sha256.hexdigest()
- def download_methodology(url_download: str, save_path: pathlib.Path) -> None:
- """
- Download methodology file.
- Download the methodology PDF-file from a specified URL and save to a
- target directory. If the file already exists in `save_path`,
- the download is skipped. If a previous release directory exists,
- the function attempts to locate the file there and compares checksums
- to avoid downloading an identical file. If it exists in the previous release,
- but it's not identical it is downloaded. If the file exists in the previous
- release directory and is identical, a symlink will be created instead of downloading
- to avoid duplicate downloads. If the file does not exist in a previous release,
- it will be downloaded.
- Parameters
- ----------
- url_download : str
- The URL from which to download the file.
- save_path : pathlib.Path
- The path to the directory where the file should be saved.
- """
- filename = url_download.split("/")[-1]
- download_path = save_path / filename
- if download_path.exists():
- print(f"Skipping download of {download_path} because it already exists.")
- return
- previous_release = find_previous_release_path(save_path)
- # Attempt to find a file to compare in the previous release
- if previous_release:
- file_to_compare = previous_release / filename
- if file_to_compare.exists():
- response = requests.get(url_download, stream=True, timeout=30)
- response.raise_for_status()
- file_to_download_checksum = hashlib.sha256(response.content).hexdigest()
- file_to_compare_checksum = calculate_checksum(file_to_compare)
- if file_to_download_checksum == file_to_compare_checksum:
- print(
- f"File '{filename}' is identical in the previous release. "
- f"Creating symlink."
- )
- os.symlink(file_to_compare, download_path)
- return
- else:
- print(
- f"File '{filename}' differs from previous release. "
- f"Downloading file."
- )
- else:
- print(f"File '{filename}' not found in previous release. Downloading file.")
- response = requests.get(url_download, stream=True, timeout=30)
- response.raise_for_status()
- # Save downloaded file to current release
- with open(download_path, "wb") as f:
- f.write(response.content)
- else:
- print(f"No previous release found. Downloading file '{filename}'.")
- response = requests.get(url_download, stream=True, timeout=30)
- response.raise_for_status()
- with open(download_path, "wb") as f:
- f.write(response.content)
- def get_html_content(url: str) -> BeautifulSoup:
- """
- Get html from url.
- Parameters
- ----------
- url
- The url to the domain overview website.
- Returns
- -------
- html content
- """
- # If the chrome driver isn't found on your system PATH, Selenium
- # will automatically download it for you. Make sure there is no
- # chromedriver installed on your system.
- service = Service()
- options = Options()
- options.add_argument("--headless")
- driver = webdriver.Chrome(service=service, options=options)
- driver.get(url)
- # give time to load javascript
- time.sleep(5)
- html_content = driver.page_source
- return BeautifulSoup(html_content, "html.parser")
- def get_last_updated_date(soup: BeautifulSoup, url: str) -> str:
- """
- Get the date when data set way last updated from html text
- The FAO stat domain overview page includes a date when
- the data set was last updated. We need it to label our downloaded
- data sets. This function searches and extracts the date
- from the html code.
- Parameters
- ----------
- soup
- The beautiful soup object with all html code of the domain
- overview page.
- url
- The url to the domain overview page.
- Returns
- -------
- date when data set was last updated
- Raises
- ------
- DateTagNotFoundError
- If the tag for the date is not found in the html code
- """
- date_tag = soup.find("p", {"data-role": "date"})
- if not date_tag:
- raise DateTagNotFoundError(url=url)
- last_updated = date_tag.get_text()
- last_updated = datetime.strptime(last_updated, "%B %d, %Y").strftime("%Y-%m-%d")
- return last_updated
- def download_file(url_download: str, save_path: pathlib.Path) -> bool:
- """
- Download file.
- If an existing file is found at this location, the download is skipped.
- Parameters
- ----------
- url_download
- Remote URL to download the file from
- save_path
- Path to save the downloaded file to
- Returns
- -------
- True if the file was downloaded, False if a cached file was found
- """
- if not save_path.exists():
- with requests.get(url_download, stream=True, timeout=30) as response:
- response.raise_for_status()
- with open(save_path, mode="wb") as file:
- file.write(response.content)
- return True
- else:
- print(f"Skipping download of {save_path}" " because it already exists.")
- return False
- def unzip_file(local_filename: pathlib.Path) -> list[str]:
- """
- Unzip files in same directory. Skip if files are already there
- Parameters
- ----------
- local_filename
- Path to the zip file
- Returns
- -------
- List of unzipped files
- """
- unzipped_files = []
- if local_filename.suffix == ".zip":
- try:
- with zipfile.ZipFile(str(local_filename), "r") as zip_file:
- for file_info in zip_file.infolist():
- extracted_file_path = local_filename.parent / file_info.filename
- if extracted_file_path.exists():
- print(
- f"File '{file_info.filename}' already exists. "
- f"Skipping extraction."
- )
- else:
- print(f"Extracting '{file_info.filename}'...")
- zip_file.extract(file_info, local_filename.parent)
- unzipped_files.append(local_filename.name)
- # TODO Better error logging/visibilty
- except zipfile.BadZipFile:
- print(f"Error while trying to extract " f"{local_filename}")
- except NotImplementedError:
- print("Zip format not supported, " "please unzip on the command line.")
- else:
- print(f"Not attempting to extract " f"{local_filename}.")
- return unzipped_files
- def download_all_domains(
- domains: dict[str, dict[str, str]] = domains,
- downloaded_data_path: pathlib.Path = downloaded_data_path,
- ) -> list[str]:
- """
- Download and unpack all climate-related domains from the FAO stat website.
- Extract the date when the data set was last updated and create a directory
- with the same name. Download the zip files for each domain if
- it does not already exist. Unpack the zip file and save in
- the same directory.
- Parameters
- ----------
- sources
- Name of data set, url to domain overview,
- and download url
- Returns
- -------
- List of input files that have been fetched or found locally.
- """
- downloaded_files = []
- for ds_name, urls in domains.items():
- url = urls["url_domain"]
- url_download = urls["url_download"]
- url_methodology = urls["url_methodology"]
- soup = get_html_content(url)
- last_updated = get_last_updated_date(soup, url)
- if not downloaded_data_path.exists():
- downloaded_data_path.mkdir()
- ds_path = downloaded_data_path / ds_name
- if not ds_path.exists():
- ds_path.mkdir()
- local_data_dir = ds_path / last_updated
- if not local_data_dir.exists():
- local_data_dir.mkdir()
- download_methodology(save_path=local_data_dir, url_download=url_methodology)
- local_filename = local_data_dir / f"{ds_name}.zip"
- download_file(url_download=url_download, save_path=local_filename)
- downloaded_files.append(str(local_filename))
- unzip_file(local_filename)
- return downloaded_files
|