download.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """Downloads data from FAOSTAT website."""
  2. import pathlib
  3. import time
  4. import zipfile
  5. from datetime import datetime
  6. import bs4
  7. import requests
  8. from bs4 import BeautifulSoup
  9. from selenium import webdriver
  10. from selenium.webdriver.chrome.service import Service
  11. from faostat_data_primap.exceptions import DateTagNotFoundError
  12. def get_html_content(url: str) -> bs4.BeautifulSoup:
  13. """
  14. Get html from url.
  15. Parameters
  16. ----------
  17. url
  18. The url to the domain overview website.
  19. Returns
  20. -------
  21. html content
  22. """
  23. # If the chrome driver isn't found on your system PATH, Selenium
  24. # will automatically download it for you. Make sure there is no
  25. # chromedriver installed on your system.
  26. service = Service()
  27. driver = webdriver.Chrome(service=service)
  28. driver.get(url)
  29. # give time to load javascript
  30. time.sleep(3)
  31. html_content = driver.page_source
  32. return BeautifulSoup(html_content, "html.parser")
  33. def get_last_updated_date(soup: bs4.BeautifulSoup, url: str) -> str:
  34. """
  35. Get the date when data set way last updated from html text
  36. The FAO stat domain overview page includes a date when
  37. the data set was last updated. We need it to label our downloaded
  38. data sets. This function searches and extracts the date
  39. from the html code.
  40. Parameters
  41. ----------
  42. soup
  43. The beautiful soup object with all html code of the domain
  44. overview page.
  45. url
  46. The url to the domain overview page.
  47. Returns
  48. -------
  49. date when data set was last updated
  50. Raises
  51. ------
  52. DateTagNotFoundError
  53. If the tag for the date is not found in the html code
  54. """
  55. date_tag = soup.find("p", {"data-role": "date"})
  56. if not date_tag:
  57. raise DateTagNotFoundError(url=url)
  58. last_updated = date_tag.get_text()
  59. last_updated = datetime.strptime(last_updated, "%B %d, %Y").strftime("%Y-%m-%d")
  60. return last_updated
  61. def download_file(url_download: str, save_path: pathlib.PosixPath):
  62. """
  63. Download file.
  64. If an existing file is found at this location, the download is skipped.
  65. Parameters
  66. ----------
  67. url_download
  68. Remote URL to download the file from
  69. save_path
  70. Path to save the downloaded file to
  71. Returns
  72. -------
  73. True if the file was downloaded, False if a cached file was found
  74. """
  75. if not save_path.exists():
  76. with requests.get(url_download, stream=True, timeout=30) as response:
  77. response.raise_for_status()
  78. with open(save_path, mode="wb") as file:
  79. file.write(response.content)
  80. return True
  81. else:
  82. print(f"Skipping download of {save_path}" " because it already exists.")
  83. return False
  84. def unzip_file(local_filename: pathlib.PosixPath):
  85. """
  86. Unzip files in same directory. Skip if files are already there
  87. Parameters
  88. ----------
  89. local_filename
  90. Path to the zip file
  91. Returns
  92. -------
  93. List of unzipped files
  94. """
  95. unzipped_files = []
  96. if local_filename.suffix == ".zip":
  97. try:
  98. with zipfile.ZipFile(str(local_filename), "r") as zip_file:
  99. for file_info in zip_file.infolist():
  100. extracted_file_path = local_filename.parent / file_info.filename
  101. if extracted_file_path.exists():
  102. print(
  103. f"File '{file_info.filename}' already exists. "
  104. f"Skipping extraction."
  105. )
  106. else:
  107. print(f"Extracting '{file_info.filename}'...")
  108. zip_file.extract(file_info, local_filename.parent)
  109. unzipped_files.append(local_filename)
  110. # TODO Better error logging/visibilty
  111. except zipfile.BadZipFile:
  112. print(f"Error while trying to extract " f"{local_filename}")
  113. except NotImplementedError:
  114. print("Zip format not supported, " "please unzip on the command line.")
  115. else:
  116. print(f"Not attempting to extract " f"{local_filename}.")
  117. return unzipped_files