download.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """Downloads data from FAOSTAT website."""
  2. import os
  3. import pathlib
  4. import time
  5. import zipfile
  6. from datetime import datetime
  7. import requests
  8. from bs4 import BeautifulSoup
  9. from selenium import webdriver
  10. from selenium.webdriver.chrome.options import Options
  11. from selenium.webdriver.chrome.service import Service
  12. from faostat_data_primap.exceptions import DateTagNotFoundError
  13. from faostat_data_primap.helper.definitions import domains
  14. from faostat_data_primap.helper.paths import downloaded_data_path
  15. def download_methodology(url_download: str, save_path: pathlib.Path) -> None:
  16. """
  17. Download methodology file.
  18. Download the methodology PDF-file from a specified URL and save to a
  19. target directory. If the file already exists in `save_path`,
  20. the download is skipped. If a previous release directory exists,
  21. the function attempts to locate the file there and compares checksums
  22. to avoid downloading an identical file. If it exists in the previous release,
  23. but it's not identical it is downloaded. If the file exists in the previous
  24. release directory and is identical, a symlink will be created instead of downloading
  25. to avoid duplicate downloads. If the file does not exist in a previous release,
  26. it will be downloaded.
  27. Parameters
  28. ----------
  29. url_download : str
  30. The URL from which to download the file.
  31. save_path : pathlib.Path
  32. The path to the directory where the file should be saved.
  33. """
  34. filename = url_download.split("/")[-1]
  35. download_path = save_path / filename
  36. if download_path.exists():
  37. if download_path.is_symlink():
  38. os.remove(download_path)
  39. else:
  40. print(f"Skipping download of {download_path} because it already exists.")
  41. return
  42. response = requests.get(url_download, stream=True, timeout=30)
  43. response.raise_for_status()
  44. with open(download_path, "wb") as f:
  45. f.write(response.content)
  46. def get_html_content(url: str) -> BeautifulSoup:
  47. """
  48. Get html from url.
  49. Parameters
  50. ----------
  51. url
  52. The url to the domain overview website.
  53. Returns
  54. -------
  55. html content
  56. """
  57. # If the chrome driver isn't found on your system PATH, Selenium
  58. # will automatically download it for you. Make sure there is no
  59. # chromedriver installed on your system.
  60. service = Service()
  61. options = Options()
  62. options.add_argument("--headless")
  63. driver = webdriver.Chrome(service=service, options=options)
  64. driver.get(url)
  65. # give time to load javascript
  66. time.sleep(5)
  67. html_content = driver.page_source
  68. return BeautifulSoup(html_content, "html.parser")
  69. def get_last_updated_date(soup: BeautifulSoup, url: str) -> str:
  70. """
  71. Get the date when data set way last updated from html text
  72. The FAO stat domain overview page includes a date when
  73. the data set was last updated. We need it to label our downloaded
  74. data sets. This function searches and extracts the date
  75. from the html code.
  76. Parameters
  77. ----------
  78. soup
  79. The beautiful soup object with all html code of the domain
  80. overview page.
  81. url
  82. The url to the domain overview page.
  83. Returns
  84. -------
  85. date when data set was last updated
  86. Raises
  87. ------
  88. DateTagNotFoundError
  89. If the tag for the date is not found in the html code
  90. """
  91. date_tag = soup.find("p", {"data-role": "date"})
  92. if not date_tag:
  93. raise DateTagNotFoundError(url=url)
  94. last_updated = date_tag.get_text()
  95. last_updated = datetime.strptime(last_updated, "%B %d, %Y").strftime("%Y-%m-%d")
  96. return last_updated
  97. def download_file(url_download: str, save_path: pathlib.Path) -> bool:
  98. """
  99. Download file.
  100. If an existing file is found at this location, the download is skipped.
  101. Parameters
  102. ----------
  103. url_download
  104. Remote URL to download the file from
  105. save_path
  106. Path to save the downloaded file to
  107. Returns
  108. -------
  109. True if the file was downloaded, False if a cached file was found
  110. """
  111. if save_path.exists():
  112. if not save_path.is_symlink():
  113. print(f"Skipping download of {save_path} because it already exists.")
  114. return False
  115. os.remove(save_path)
  116. with requests.get(url_download, stream=True, timeout=30) as response:
  117. response.raise_for_status()
  118. with open(save_path, mode="wb") as file:
  119. file.write(response.content)
  120. return True
  121. def unzip_file(local_filename: pathlib.Path) -> list[str]:
  122. """
  123. Unzip files in same directory. Skip if files are already there
  124. Parameters
  125. ----------
  126. local_filename
  127. Path to the zip file
  128. Returns
  129. -------
  130. List of unzipped files
  131. """
  132. unzipped_files = []
  133. if local_filename.suffix == ".zip":
  134. try:
  135. with zipfile.ZipFile(str(local_filename), "r") as zip_file:
  136. for file_info in zip_file.infolist():
  137. extracted_file_path = local_filename.parent / file_info.filename
  138. if extracted_file_path.exists():
  139. if not extracted_file_path.is_symlink():
  140. print(
  141. f"File '{file_info.filename}' already exists. "
  142. f"Skipping extraction."
  143. )
  144. continue
  145. else:
  146. file_to_unzip_path = (
  147. local_filename.parent / file_info.filename
  148. )
  149. os.remove(file_to_unzip_path)
  150. print(f"Extracting '{file_info.filename}'...")
  151. zip_file.extract(file_info, local_filename.parent)
  152. unzipped_files.append(local_filename.name)
  153. # TODO Better error logging/visibilty
  154. except zipfile.BadZipFile:
  155. print(f"Error while trying to extract " f"{local_filename}")
  156. except NotImplementedError:
  157. print("Zip format not supported, " "please unzip on the command line.")
  158. else:
  159. print(f"Not attempting to extract " f"{local_filename}.")
  160. return unzipped_files
  161. def download_all_domains(
  162. domains: dict[str, dict[str, str]] = domains,
  163. downloaded_data_path: pathlib.Path = downloaded_data_path,
  164. ) -> list[str]:
  165. """
  166. Download and unpack all climate-related domains from the FAO stat website.
  167. Extract the date when the data set was last updated and create a directory
  168. with the same name. Download the zip files for each domain if
  169. it does not already exist. Unpack the zip file and save in
  170. the same directory.
  171. Parameters
  172. ----------
  173. sources
  174. Name of data set, url to domain overview,
  175. and download url
  176. Returns
  177. -------
  178. List of input files that have been fetched or found locally.
  179. """
  180. downloaded_files = []
  181. for ds_name, urls in domains.items():
  182. url = urls["url_domain"]
  183. url_download = urls["url_download"]
  184. url_methodology = urls["url_methodology"]
  185. soup = get_html_content(url)
  186. last_updated = get_last_updated_date(soup, url)
  187. if not downloaded_data_path.exists():
  188. downloaded_data_path.mkdir()
  189. ds_path = downloaded_data_path / ds_name
  190. if not ds_path.exists():
  191. ds_path.mkdir()
  192. local_data_dir = ds_path / last_updated
  193. if not local_data_dir.exists():
  194. local_data_dir.mkdir()
  195. download_methodology(save_path=local_data_dir, url_download=url_methodology)
  196. local_filename = local_data_dir / f"{ds_name}.zip"
  197. download_file(url_download=url_download, save_path=local_filename)
  198. downloaded_files.append(str(local_filename))
  199. unzip_file(local_filename)
  200. return downloaded_files