download.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. """Downloads data from FAOSTAT website."""
  2. import hashlib
  3. import os
  4. import pathlib
  5. import time
  6. import zipfile
  7. from datetime import datetime
  8. import bs4
  9. import requests
  10. from bs4 import BeautifulSoup
  11. from selenium import webdriver
  12. from selenium.webdriver.chrome.service import Service
  13. from faostat_data_primap.exceptions import DateTagNotFoundError
  14. def find_previous_release_path(
  15. current_release_path: pathlib.PosixPath,
  16. ) -> pathlib.PosixPath | None:
  17. """
  18. Find the most recent previous release directory within same domain
  19. Release directories are assumed to be subdirectories within the same parent
  20. directory as `current_release_path`. The Sorting is done alphabetically,
  21. so directory names should follow the naming convention YYYY-MM-DD
  22. Parameters
  23. ----------
  24. current_release_path : pathlib.PosixPath
  25. The path of the current release directory.
  26. Returns
  27. -------
  28. pathlib.PosixPath or None
  29. Returns the path of the most recent previous release directory if one exists,
  30. otherwise returns None.
  31. """
  32. domain_path = current_release_path.parent
  33. all_releases = [
  34. release_name for release_name in os.listdir(current_release_path.parent)
  35. ]
  36. # make sure all directories follow the naming convention
  37. try:
  38. all_releases_datetime = [
  39. datetime.strptime(release, "%Y-%m-%d") for release in all_releases
  40. ]
  41. except ValueError as e:
  42. msg = f"All release folders must be in YYYY-MM-DD format, got {all_releases=}"
  43. raise ValueError(msg) from e
  44. all_releases_datetime = sorted(all_releases_datetime)
  45. current_release_datetime = datetime.strptime(current_release_path.name, "%Y-%m-%d")
  46. index = all_releases_datetime.index(current_release_datetime)
  47. # if the current release is the latest or the only one
  48. if index == 0:
  49. return None
  50. return domain_path / all_releases_datetime[index - 1].strftime("%Y-%m-%d")
  51. def calculate_checksum(file_path) -> str:
  52. """
  53. Calculate the SHA-256 checksum of a file.
  54. Parameters
  55. ----------
  56. file_path : pathlib.PosixPath
  57. The path to the file for which the checksum is calculated.
  58. Returns
  59. -------
  60. str
  61. The SHA-256 checksum of the file as a hexadecimal string.
  62. """
  63. sha256 = hashlib.sha256()
  64. with open(file_path, "rb") as f:
  65. for chunk in iter(lambda: f.read(4096), b""):
  66. sha256.update(chunk)
  67. return sha256.hexdigest()
  68. def download_methodology(url_download: str, save_path: pathlib.PosixPath):
  69. """
  70. Download methodology file.
  71. Download the methodology PDF-file from a specified URL and save to a
  72. target directory. If the file already exists in `save_path`,
  73. the download is skipped. If a previous release directory exists,
  74. the function attempts to locate the file there and compares checksums
  75. to avoid downloading an identical file. If it exists in the previous release,
  76. but it's not identical it is downloaded. If the file exists in the previous
  77. release directory and is identical, a symlink will be created instead of downloading
  78. to avoid duplicate downloads. If the file does not exist in a previous release,
  79. it will be downloaded.
  80. Parameters
  81. ----------
  82. url_download : str
  83. The URL from which to download the file.
  84. save_path : pathlib.PosixPath
  85. The path to the directory where the file should be saved.
  86. """
  87. filename = url_download.split("/")[-1]
  88. download_path = save_path / filename
  89. if download_path.exists():
  90. print(f"Skipping download of {download_path} because it already exists.")
  91. return
  92. previous_release = find_previous_release_path(save_path)
  93. # Attempt to find a file to compare in the previous release
  94. if previous_release:
  95. file_to_compare = previous_release / filename
  96. if file_to_compare.exists():
  97. response = requests.get(url_download, stream=True, timeout=30)
  98. response.raise_for_status()
  99. file_to_download_checksum = hashlib.sha256(response.content).hexdigest()
  100. file_to_compare_checksum = calculate_checksum(file_to_compare)
  101. if file_to_download_checksum == file_to_compare_checksum:
  102. print(
  103. f"File '{filename}' is identical in the previous release. "
  104. f"Creating symlink."
  105. )
  106. os.symlink(file_to_compare, download_path)
  107. return
  108. else:
  109. print(
  110. f"File '{filename}' differs from previous release. "
  111. f"Downloading file."
  112. )
  113. else:
  114. print(f"File '{filename}' not found in previous release. Downloading file.")
  115. response = requests.get(url_download, stream=True, timeout=30)
  116. response.raise_for_status()
  117. # Save downloaded file to current release
  118. with open(download_path, "wb") as f:
  119. f.write(response.content)
  120. else:
  121. print(f"No previous release found. Downloading file '{filename}'.")
  122. response = requests.get(url_download, stream=True, timeout=30)
  123. response.raise_for_status()
  124. with open(download_path, "wb") as f:
  125. f.write(response.content)
  126. def get_html_content(url: str) -> bs4.BeautifulSoup:
  127. """
  128. Get html from url.
  129. Parameters
  130. ----------
  131. url
  132. The url to the domain overview website.
  133. Returns
  134. -------
  135. html content
  136. """
  137. # If the chrome driver isn't found on your system PATH, Selenium
  138. # will automatically download it for you. Make sure there is no
  139. # chromedriver installed on your system.
  140. service = Service()
  141. driver = webdriver.Chrome(service=service)
  142. driver.get(url)
  143. # give time to load javascript
  144. time.sleep(3)
  145. html_content = driver.page_source
  146. return BeautifulSoup(html_content, "html.parser")
  147. def get_last_updated_date(soup: bs4.BeautifulSoup, url: str) -> str:
  148. """
  149. Get the date when data set way last updated from html text
  150. The FAO stat domain overview page includes a date when
  151. the data set was last updated. We need it to label our downloaded
  152. data sets. This function searches and extracts the date
  153. from the html code.
  154. Parameters
  155. ----------
  156. soup
  157. The beautiful soup object with all html code of the domain
  158. overview page.
  159. url
  160. The url to the domain overview page.
  161. Returns
  162. -------
  163. date when data set was last updated
  164. Raises
  165. ------
  166. DateTagNotFoundError
  167. If the tag for the date is not found in the html code
  168. """
  169. date_tag = soup.find("p", {"data-role": "date"})
  170. if not date_tag:
  171. raise DateTagNotFoundError(url=url)
  172. last_updated = date_tag.get_text()
  173. last_updated = datetime.strptime(last_updated, "%B %d, %Y").strftime("%Y-%m-%d")
  174. return last_updated
  175. def download_file(url_download: str, save_path: pathlib.PosixPath):
  176. """
  177. Download file.
  178. If an existing file is found at this location, the download is skipped.
  179. Parameters
  180. ----------
  181. url_download
  182. Remote URL to download the file from
  183. save_path
  184. Path to save the downloaded file to
  185. Returns
  186. -------
  187. True if the file was downloaded, False if a cached file was found
  188. """
  189. if not save_path.exists():
  190. with requests.get(url_download, stream=True, timeout=30) as response:
  191. response.raise_for_status()
  192. with open(save_path, mode="wb") as file:
  193. file.write(response.content)
  194. return True
  195. else:
  196. print(f"Skipping download of {save_path}" " because it already exists.")
  197. return False
  198. def unzip_file(local_filename: pathlib.PosixPath):
  199. """
  200. Unzip files in same directory. Skip if files are already there
  201. Parameters
  202. ----------
  203. local_filename
  204. Path to the zip file
  205. Returns
  206. -------
  207. List of unzipped files
  208. """
  209. unzipped_files = []
  210. if local_filename.suffix == ".zip":
  211. try:
  212. with zipfile.ZipFile(str(local_filename), "r") as zip_file:
  213. for file_info in zip_file.infolist():
  214. extracted_file_path = local_filename.parent / file_info.filename
  215. if extracted_file_path.exists():
  216. print(
  217. f"File '{file_info.filename}' already exists. "
  218. f"Skipping extraction."
  219. )
  220. else:
  221. print(f"Extracting '{file_info.filename}'...")
  222. zip_file.extract(file_info, local_filename.parent)
  223. unzipped_files.append(local_filename)
  224. # TODO Better error logging/visibilty
  225. except zipfile.BadZipFile:
  226. print(f"Error while trying to extract " f"{local_filename}")
  227. except NotImplementedError:
  228. print("Zip format not supported, " "please unzip on the command line.")
  229. else:
  230. print(f"Not attempting to extract " f"{local_filename}.")
  231. return unzipped_files