download.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. """Downloads data from FAOSTAT website."""
  2. import hashlib
  3. import os
  4. import pathlib
  5. import time
  6. import zipfile
  7. from datetime import datetime
  8. import requests
  9. from bs4 import BeautifulSoup
  10. from selenium import webdriver
  11. from selenium.webdriver.chrome.options import Options
  12. from selenium.webdriver.chrome.service import Service
  13. from faostat_data_primap.exceptions import DateTagNotFoundError
  14. from faostat_data_primap.helper.definitions import domains
  15. from faostat_data_primap.helper.paths import downloaded_data_path
  16. def find_previous_release_path(
  17. current_release_path: pathlib.Path,
  18. ) -> pathlib.Path | None:
  19. """
  20. Find the most recent previous release directory within same domain
  21. Release directories are assumed to be subdirectories within the same parent
  22. directory as `current_release_path`. The Sorting is done alphabetically,
  23. so directory names should follow the naming convention YYYY-MM-DD
  24. Parameters
  25. ----------
  26. current_release_path : pathlib.Path
  27. The path of the current release directory.
  28. Returns
  29. -------
  30. pathlib.Path or None
  31. Returns the path of the most recent previous release directory if one exists,
  32. otherwise returns None.
  33. """
  34. domain_path = current_release_path.parent
  35. all_releases = [
  36. release_name
  37. for release_name in os.listdir(current_release_path.parent)
  38. if (domain_path / release_name).is_dir()
  39. ]
  40. # make sure all directories follow the naming convention
  41. try:
  42. all_releases_datetime = [
  43. datetime.strptime(release, "%Y-%m-%d") for release in all_releases
  44. ]
  45. except ValueError as e:
  46. msg = (
  47. "All release folders must be in YYYY-MM-DD format, "
  48. f"got {sorted(all_releases)}"
  49. )
  50. raise ValueError(msg) from e
  51. all_releases_datetime = sorted(all_releases_datetime)
  52. current_release_datetime = datetime.strptime(current_release_path.name, "%Y-%m-%d")
  53. index = all_releases_datetime.index(current_release_datetime)
  54. # if the current release is the latest or the only one
  55. if index == 0:
  56. return None
  57. return domain_path / all_releases_datetime[index - 1].strftime("%Y-%m-%d")
  58. def calculate_checksum(file_path: pathlib.Path) -> str:
  59. """
  60. Calculate the SHA-256 checksum of a file.
  61. Parameters
  62. ----------
  63. file_path : pathlib.Path
  64. The path to the file for which the checksum is calculated.
  65. Returns
  66. -------
  67. str
  68. The SHA-256 checksum of the file as a hexadecimal string.
  69. """
  70. sha256 = hashlib.sha256()
  71. with open(file_path, "rb") as f:
  72. for chunk in iter(lambda: f.read(4096), b""):
  73. sha256.update(chunk)
  74. return sha256.hexdigest()
  75. def download_methodology(url_download: str, save_path: pathlib.Path) -> None:
  76. """
  77. Download methodology file.
  78. Download the methodology PDF-file from a specified URL and save to a
  79. target directory. If the file already exists in `save_path`,
  80. the download is skipped. If a previous release directory exists,
  81. the function attempts to locate the file there and compares checksums
  82. to avoid downloading an identical file. If it exists in the previous release,
  83. but it's not identical it is downloaded. If the file exists in the previous
  84. release directory and is identical, a symlink will be created instead of downloading
  85. to avoid duplicate downloads. If the file does not exist in a previous release,
  86. it will be downloaded.
  87. Parameters
  88. ----------
  89. url_download : str
  90. The URL from which to download the file.
  91. save_path : pathlib.Path
  92. The path to the directory where the file should be saved.
  93. """
  94. filename = url_download.split("/")[-1]
  95. download_path = save_path / filename
  96. if download_path.exists():
  97. print(f"Skipping download of {download_path} because it already exists.")
  98. return
  99. previous_release = find_previous_release_path(save_path)
  100. # Attempt to find a file to compare in the previous release
  101. if previous_release:
  102. file_to_compare = previous_release / filename
  103. if file_to_compare.exists():
  104. response = requests.get(url_download, stream=True, timeout=30)
  105. response.raise_for_status()
  106. file_to_download_checksum = hashlib.sha256(response.content).hexdigest()
  107. file_to_compare_checksum = calculate_checksum(file_to_compare)
  108. if file_to_download_checksum == file_to_compare_checksum:
  109. print(
  110. f"File '{filename}' is identical in the previous release. "
  111. f"Creating symlink."
  112. )
  113. os.symlink(file_to_compare, download_path)
  114. return
  115. else:
  116. print(
  117. f"File '{filename}' differs from previous release. "
  118. f"Downloading file."
  119. )
  120. else:
  121. print(f"File '{filename}' not found in previous release. Downloading file.")
  122. response = requests.get(url_download, stream=True, timeout=30)
  123. response.raise_for_status()
  124. # Save downloaded file to current release
  125. with open(download_path, "wb") as f:
  126. f.write(response.content)
  127. else:
  128. print(f"No previous release found. Downloading file '{filename}'.")
  129. response = requests.get(url_download, stream=True, timeout=30)
  130. response.raise_for_status()
  131. with open(download_path, "wb") as f:
  132. f.write(response.content)
  133. def get_html_content(url: str) -> BeautifulSoup:
  134. """
  135. Get html from url.
  136. Parameters
  137. ----------
  138. url
  139. The url to the domain overview website.
  140. Returns
  141. -------
  142. html content
  143. """
  144. # If the chrome driver isn't found on your system PATH, Selenium
  145. # will automatically download it for you. Make sure there is no
  146. # chromedriver installed on your system.
  147. service = Service()
  148. options = Options()
  149. options.add_argument("--headless")
  150. driver = webdriver.Chrome(service=service, options=options)
  151. driver.get(url)
  152. # give time to load javascript
  153. time.sleep(5)
  154. html_content = driver.page_source
  155. return BeautifulSoup(html_content, "html.parser")
  156. def get_last_updated_date(soup: BeautifulSoup, url: str) -> str:
  157. """
  158. Get the date when data set way last updated from html text
  159. The FAO stat domain overview page includes a date when
  160. the data set was last updated. We need it to label our downloaded
  161. data sets. This function searches and extracts the date
  162. from the html code.
  163. Parameters
  164. ----------
  165. soup
  166. The beautiful soup object with all html code of the domain
  167. overview page.
  168. url
  169. The url to the domain overview page.
  170. Returns
  171. -------
  172. date when data set was last updated
  173. Raises
  174. ------
  175. DateTagNotFoundError
  176. If the tag for the date is not found in the html code
  177. """
  178. date_tag = soup.find("p", {"data-role": "date"})
  179. if not date_tag:
  180. raise DateTagNotFoundError(url=url)
  181. last_updated = date_tag.get_text()
  182. last_updated = datetime.strptime(last_updated, "%B %d, %Y").strftime("%Y-%m-%d")
  183. return last_updated
  184. def download_file(url_download: str, save_path: pathlib.Path) -> bool:
  185. """
  186. Download file.
  187. If an existing file is found at this location, the download is skipped.
  188. Parameters
  189. ----------
  190. url_download
  191. Remote URL to download the file from
  192. save_path
  193. Path to save the downloaded file to
  194. Returns
  195. -------
  196. True if the file was downloaded, False if a cached file was found
  197. """
  198. if not save_path.exists():
  199. with requests.get(url_download, stream=True, timeout=30) as response:
  200. response.raise_for_status()
  201. with open(save_path, mode="wb") as file:
  202. file.write(response.content)
  203. return True
  204. else:
  205. print(f"Skipping download of {save_path}" " because it already exists.")
  206. return False
  207. def unzip_file(local_filename: pathlib.Path) -> list[str]:
  208. """
  209. Unzip files in same directory. Skip if files are already there
  210. Parameters
  211. ----------
  212. local_filename
  213. Path to the zip file
  214. Returns
  215. -------
  216. List of unzipped files
  217. """
  218. unzipped_files = []
  219. if local_filename.suffix == ".zip":
  220. try:
  221. with zipfile.ZipFile(str(local_filename), "r") as zip_file:
  222. for file_info in zip_file.infolist():
  223. extracted_file_path = local_filename.parent / file_info.filename
  224. if extracted_file_path.exists():
  225. print(
  226. f"File '{file_info.filename}' already exists. "
  227. f"Skipping extraction."
  228. )
  229. else:
  230. print(f"Extracting '{file_info.filename}'...")
  231. zip_file.extract(file_info, local_filename.parent)
  232. unzipped_files.append(local_filename.name)
  233. # TODO Better error logging/visibilty
  234. except zipfile.BadZipFile:
  235. print(f"Error while trying to extract " f"{local_filename}")
  236. except NotImplementedError:
  237. print("Zip format not supported, " "please unzip on the command line.")
  238. else:
  239. print(f"Not attempting to extract " f"{local_filename}.")
  240. return unzipped_files
  241. def download_all_domains(
  242. domains: dict[str, dict[str, str]] = domains,
  243. downloaded_data_path: pathlib.Path = downloaded_data_path,
  244. ) -> list[str]:
  245. """
  246. Download and unpack all climate-related domains from the FAO stat website.
  247. Extract the date when the data set was last updated and create a directory
  248. with the same name. Download the zip files for each domain if
  249. it does not already exist. Unpack the zip file and save in
  250. the same directory.
  251. Parameters
  252. ----------
  253. sources
  254. Name of data set, url to domain overview,
  255. and download url
  256. Returns
  257. -------
  258. List of input files that have been fetched or found locally.
  259. """
  260. downloaded_files = []
  261. for ds_name, urls in domains.items():
  262. url = urls["url_domain"]
  263. url_download = urls["url_download"]
  264. url_methodology = urls["url_methodology"]
  265. soup = get_html_content(url)
  266. last_updated = get_last_updated_date(soup, url)
  267. if not downloaded_data_path.exists():
  268. downloaded_data_path.mkdir()
  269. ds_path = downloaded_data_path / ds_name
  270. if not ds_path.exists():
  271. ds_path.mkdir()
  272. local_data_dir = ds_path / last_updated
  273. if not local_data_dir.exists():
  274. local_data_dir.mkdir()
  275. download_methodology(save_path=local_data_dir, url_download=url_methodology)
  276. local_filename = local_data_dir / f"{ds_name}.zip"
  277. download_file(url_download=url_download, save_path=local_filename)
  278. downloaded_files.append(str(local_filename))
  279. unzip_file(local_filename)
  280. return downloaded_files