123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- """Downloads data from FAOSTAT website."""
- import os
- import pathlib
- import time
- import zipfile
- from datetime import datetime
- import requests
- from bs4 import BeautifulSoup
- from selenium import webdriver
- from selenium.webdriver.chrome.options import Options
- from selenium.webdriver.chrome.service import Service
- from faostat_data_primap.exceptions import DateTagNotFoundError
- from faostat_data_primap.helper.definitions import domains
- from faostat_data_primap.helper.paths import downloaded_data_path
- def download_methodology(url_download: str, save_path: pathlib.Path) -> None:
- """
- Download methodology file.
- Download the methodology PDF-file from a specified URL and save to a
- target directory. If the file already exists in `save_path`,
- the download is skipped. If a previous release directory exists,
- the function attempts to locate the file there and compares checksums
- to avoid downloading an identical file. If it exists in the previous release,
- but it's not identical it is downloaded. If the file exists in the previous
- release directory and is identical, a symlink will be created instead of downloading
- to avoid duplicate downloads. If the file does not exist in a previous release,
- it will be downloaded.
- Parameters
- ----------
- url_download : str
- The URL from which to download the file.
- save_path : pathlib.Path
- The path to the directory where the file should be saved.
- """
- filename = url_download.split("/")[-1]
- download_path = save_path / filename
- if download_path.exists():
- if download_path.is_symlink():
- os.remove(download_path)
- else:
- print(f"Skipping download of {download_path} because it already exists.")
- return
- response = requests.get(url_download, stream=True, timeout=30)
- response.raise_for_status()
- with open(download_path, "wb") as f:
- f.write(response.content)
- def get_html_content(url: str) -> BeautifulSoup:
- """
- Get html from url.
- Parameters
- ----------
- url
- The url to the domain overview website.
- Returns
- -------
- html content
- """
- # If the chrome driver isn't found on your system PATH, Selenium
- # will automatically download it for you. Make sure there is no
- # chromedriver installed on your system.
- service = Service()
- options = Options()
- options.add_argument("--headless")
- driver = webdriver.Chrome(service=service, options=options)
- driver.get(url)
- # give time to load javascript
- time.sleep(5)
- html_content = driver.page_source
- return BeautifulSoup(html_content, "html.parser")
- def get_last_updated_date(soup: BeautifulSoup, url: str) -> str:
- """
- Get the date when data set way last updated from html text
- The FAO stat domain overview page includes a date when
- the data set was last updated. We need it to label our downloaded
- data sets. This function searches and extracts the date
- from the html code.
- Parameters
- ----------
- soup
- The beautiful soup object with all html code of the domain
- overview page.
- url
- The url to the domain overview page.
- Returns
- -------
- date when data set was last updated
- Raises
- ------
- DateTagNotFoundError
- If the tag for the date is not found in the html code
- """
- date_tag = soup.find("p", {"data-role": "date"})
- if not date_tag:
- raise DateTagNotFoundError(url=url)
- last_updated = date_tag.get_text()
- last_updated = datetime.strptime(last_updated, "%B %d, %Y").strftime("%Y-%m-%d")
- return last_updated
- def download_file(url_download: str, save_path: pathlib.Path) -> bool:
- """
- Download file.
- If an existing file is found at this location, the download is skipped.
- Parameters
- ----------
- url_download
- Remote URL to download the file from
- save_path
- Path to save the downloaded file to
- Returns
- -------
- True if the file was downloaded, False if a cached file was found
- """
- if save_path.exists():
- if not save_path.is_symlink():
- print(f"Skipping download of {save_path} because it already exists.")
- return False
- os.remove(save_path)
- with requests.get(url_download, stream=True, timeout=30) as response:
- response.raise_for_status()
- with open(save_path, mode="wb") as file:
- file.write(response.content)
- return True
- def unzip_file(local_filename: pathlib.Path) -> list[str]:
- """
- Unzip files in same directory. Skip if files are already there
- Parameters
- ----------
- local_filename
- Path to the zip file
- Returns
- -------
- List of unzipped files
- """
- unzipped_files = []
- if local_filename.suffix == ".zip":
- try:
- with zipfile.ZipFile(str(local_filename), "r") as zip_file:
- for file_info in zip_file.infolist():
- extracted_file_path = local_filename.parent / file_info.filename
- if extracted_file_path.exists():
- if not extracted_file_path.is_symlink():
- print(
- f"File '{file_info.filename}' already exists. "
- f"Skipping extraction."
- )
- continue
- else:
- file_to_unzip_path = (
- local_filename.parent / file_info.filename
- )
- os.remove(file_to_unzip_path)
- print(f"Extracting '{file_info.filename}'...")
- zip_file.extract(file_info, local_filename.parent)
- unzipped_files.append(local_filename.name)
- # TODO Better error logging/visibilty
- except zipfile.BadZipFile:
- print(f"Error while trying to extract " f"{local_filename}")
- except NotImplementedError:
- print("Zip format not supported, " "please unzip on the command line.")
- else:
- print(f"Not attempting to extract " f"{local_filename}.")
- return unzipped_files
- def download_all_domains(
- domains: dict[str, dict[str, str]] = domains,
- downloaded_data_path: pathlib.Path = downloaded_data_path,
- ) -> list[str]:
- """
- Download and unpack all climate-related domains from the FAO stat website.
- Extract the date when the data set was last updated and create a directory
- with the same name. Download the zip files for each domain if
- it does not already exist. Unpack the zip file and save in
- the same directory.
- Parameters
- ----------
- sources
- Name of data set, url to domain overview,
- and download url
- Returns
- -------
- List of input files that have been fetched or found locally.
- """
- downloaded_files = []
- for ds_name, urls in domains.items():
- url = urls["url_domain"]
- url_download = urls["url_download"]
- url_methodology = urls["url_methodology"]
- soup = get_html_content(url)
- last_updated = get_last_updated_date(soup, url)
- if not downloaded_data_path.exists():
- downloaded_data_path.mkdir()
- ds_path = downloaded_data_path / ds_name
- if not ds_path.exists():
- ds_path.mkdir()
- local_data_dir = ds_path / last_updated
- if not local_data_dir.exists():
- local_data_dir.mkdir()
- download_methodology(save_path=local_data_dir, url_download=url_methodology)
- local_filename = local_data_dir / f"{ds_name}.zip"
- download_file(url_download=url_download, save_path=local_filename)
- downloaded_files.append(str(local_filename))
- unzip_file(local_filename)
- return downloaded_files
|