diff --git a/allofplos/article_class.py b/allofplos/article_class.py index 864baf6d..d78aa038 100644 --- a/allofplos/article_class.py +++ b/allofplos/article_class.py @@ -2,9 +2,12 @@ import re import subprocess +from io import BytesIO + import lxml.etree as et import requests + from allofplos.transformations import (filename_to_doi, EXT_URL_TMP, INT_URL_TMP, BASE_URL_ARTICLE_LANDING_PAGE) from allofplos.plos_regex import (validate_doi, corpusdir) @@ -1097,3 +1100,27 @@ def from_filename(cls, filename): """Initiate an article object using a local XML file. """ return cls(filename_to_doi(filename)) + + @classmethod + def from_bytes(cls, resp, directory=corpusdir, write=False, overwrite=True): + tree = et.parse(BytesIO(resp)) + root = tree.getroot() + tag_path = ["/", + "article", + "front", + "article-meta", + "article-id"] + tag_location = '/'.join(tag_path) + article_ids = root.xpath(tag_location) + for art_id in article_ids: + if art_id.get('pub-id-type')=='doi': + temp = cls(art_id.text, directory=directory) + temp._tree = tree + if write and (not os.path.isfile(temp.filename) or overwrite): + with open(temp.filename, 'w') as file: + file.write(et.tostring(tree, method='xml', encoding='unicode')) + break + return temp + + + diff --git a/allofplos/async_utils/__init__.py b/allofplos/async_utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/allofplos/async_utils/fetch_test.py b/allofplos/async_utils/fetch_test.py new file mode 100644 index 00000000..c21a200d --- /dev/null +++ b/allofplos/async_utils/fetch_test.py @@ -0,0 +1,143 @@ +import asyncio +import aiohttp +import requests +import time +import os +import shutil + + +import lxml.etree as et +from timeit import default_timer + +from allofplos.plos_corpus import listdir_nohidden +from allofplos.plos_regex import ALLOFPLOS_DIR_PATH, corpusdir +from allofplos.transformations import URL_TMP, url_to_doi +from allofplos.samples.corpus_analysis import get_all_local_dois +from allofplos import Article + +MIN_DELAY = 1.0 # minimum for wait before beginning the next http-request (in s) +MIN_FILES = 9990 # index of the files to start with +NUM_FILES = 10 # how many files do you process + +ASYNC_DIRECTORY = os.path.join(ALLOFPLOS_DIR_PATH, "async_test_dir") +SYNC_DIRECTORY = os.path.join(ALLOFPLOS_DIR_PATH, "sync_test_dir") + +async def fetch(doi, session): + """Given a doi, fetch the associated url, using the given asynchronous + session (a ClientSession) as a context manager. + + Returns the article created by transforming the content of the response. + + NB: This needs to do better error handling if the url fails or points to an + invalid xml file. + """ + url = URL_TMP.format(doi) + async with session.get(url) as response: + resp = await response.read() + article = Article.from_bytes(resp, + directory=ASYNC_DIRECTORY, + write=True, + overwrite=True) + return article + +async def fetch_all(dois, max_rate=MIN_DELAY, limit_per_host=3.0): + """Launch requests for each doi. + + This first gets all of the dois passed in as dois. + + Then it checks for the existence of dois that are corrected articles that + should also be downloaded. + """ + + tasks = [] + conn = aiohttp.TCPConnector(limit_per_host=limit_per_host) + async with aiohttp.ClientSession(connector=conn) as session: + for doi in dois: + await asyncio.sleep(max_rate) # ensures no more requests than max_rate per second + task = asyncio.ensure_future(fetch(doi, session)) + tasks.append(task) # create list of tasks + + first_batch = await asyncio.gather(*tasks) # gather task responses + corrected_dois = [article.related_doi + for article in first_batch + if article.type_=="correction"] + for doi in corrected_dois: + await asyncio.sleep(max_rate) # ensures no more requests than max_rate per second + task = asyncio.ensure_future(fetch(doi, session)) + tasks.append(task) # create list of tasks + + second_batch = await asyncio.gather(*tasks) # gather task responses + + + +def sequential_fetch(doi): + """ + Fetch urls on the basis of the doi being passed in as part of a sequential + process. + + Returns the article created by transforming the content of the response. + + NB: This needs to do better error handling if the url fails or points to an + invalid xml file. + """ + url = URL_TMP.format(doi) + response = requests.get(url) + time.sleep(MIN_DELAY) + article = Article.from_bytes(response.text.encode('utf-8'), + directory=ASYNC_DIRECTORY, + write=True) + return article + +def demo_sequential(dois): + """Organises the process of downloading articles associated with dois + to SYNC_DIRECTORY sequentially. + + Side-effect: prints a timer to indicate how long it took. + """ + recreate_dir(SYNC_DIRECTORY) + start_time = default_timer() + for doi in dois: + start_time_url = default_timer() + article = sequential_fetch(doi) + if article.type_ == "correction": + new_article = sequential_fetch(article.related_doi) + + tot_elapsed = default_timer() - start_time + print(' TOTAL SECONDS: '.rjust(30, '-') + '{0:5.2f} '. \ + format(tot_elapsed, '\n')) + + +def demo_async(dois): + """Organises the process of downloading articles associated with the doi to + ASYNC_DIRECTORY asynchronous functionality. + + Side-effect: prints a timer to indicate how long it took. + """ + recreate_dir(ASYNC_DIRECTORY) + start_time = default_timer() + loop = asyncio.get_event_loop() # event loop + future = asyncio.ensure_future(fetch_all(dois)) # tasks to do + loop.run_until_complete(future) # loop until done + loop.run_until_complete(asyncio.sleep(0)) + loop.close() + tot_elapsed = default_timer() - start_time + print(' TOTAL SECONDS: '.rjust(30, '-') + '{0:5.2f} '. \ + format(tot_elapsed, '\n')) + +def recreate_dir(directory): + """Removes and recreates the directory. + """ + if os.path.isdir(directory): + shutil.rmtree(directory) + os.makedirs(directory, exist_ok=True) + +def main(): + """Main loop for running and comparing the different appraoches. + """ + + dois = get_all_local_dois(corpusdir)[MIN_FILES:MIN_FILES+NUM_FILES] + demo_sequential(dois) + demo_async(dois) + +if __name__ == '__main__': + main() diff --git a/allofplos/plos_corpus.py b/allofplos/plos_corpus.py index a1da70d9..1093da6b 100644 --- a/allofplos/plos_corpus.py +++ b/allofplos/plos_corpus.py @@ -218,10 +218,7 @@ def repo_download(dois, tempdir, ignore_existing=True, plos_network=False): :param ignore_existing: Don't re-download to tempdir if already downloaded """ # make temporary directory, if needed - try: - os.mkdir(tempdir) - except FileExistsError: - pass + os.makedirs(tempdir, exist_ok=True) if ignore_existing: existing_articles = [filename_to_doi(file) for file in listdir_nohidden(tempdir)] @@ -423,10 +420,7 @@ def download_updated_xml(article_file, :return: boolean for whether update was available & downloaded """ doi = filename_to_doi(article_file) - try: - os.mkdir(tempdir) - except FileExistsError: - pass + os.makedirs(tempdir, exist_ok=True) url = URL_TMP.format(doi) articletree_remote = et.parse(url) articleXML_remote = et.tostring(articletree_remote, method='xml', encoding='unicode') @@ -693,10 +687,7 @@ def remote_proofs_direct_check(tempdir=newarticledir, article_list=None, plos_ne :param article-list: list of uncorrected proofs to check for updates. :return: list of all articles with updated vor """ - try: - os.mkdir(tempdir) - except FileExistsError: - pass + os.makedirs(tempdir, exist_ok=True) proofs_download_list = [] if article_list is None: article_list = get_uncorrected_proofs_list() @@ -866,9 +857,7 @@ def create_local_plos_corpus(corpusdir=corpusdir, rm_metadata=True): :param rm_metadata: COMPLETE HERE :return: None """ - if os.path.isdir(corpusdir) is False: - os.mkdir(corpusdir) - print('Creating folder for article xml') + os.makedirs(corpusdir, exist_ok=True) zip_date, zip_size, metadata_path = get_zip_metadata() zip_path = download_file_from_google_drive(zip_id, local_zip, file_size=zip_size) unzip_articles(file_path=zip_path) @@ -885,9 +874,7 @@ def create_test_plos_corpus(corpusdir=corpusdir): :param corpusdir: directory where the corpus is to be downloaded and extracted :return: None """ - if os.path.isdir(corpusdir) is False: - os.mkdir(corpusdir) - print('Creating folder for article xml') + os.makedirs(corpusdir, exist_ok=True) zip_path = download_file_from_google_drive(test_zip_id, local_test_zip) unzip_articles(file_path=zip_path, extract_directory=corpusdir) diff --git a/allofplos/samples/corpus_analysis.py b/allofplos/samples/corpus_analysis.py index 0b51541d..b1309731 100644 --- a/allofplos/samples/corpus_analysis.py +++ b/allofplos/samples/corpus_analysis.py @@ -290,10 +290,7 @@ def revisiondate_sanity_check(article_list=None, tempdir=newarticledir, director article_list = sorted(pubdates, key=pubdates.__getitem__, reverse=True) article_list = article_list[:30000] - try: - os.mkdir(tempdir) - except FileExistsError: - pass + os.makedirs(tempdir, exist_ok=True) articles_different_list = [] max_value = len(article_list) bar = progressbar.ProgressBar(redirect_stdout=True, max_value=max_value) diff --git a/setup.py b/setup.py index 00c25d96..8152ab80 100644 --- a/setup.py +++ b/setup.py @@ -3,9 +3,9 @@ import sys if sys.version_info.major < 3: - sys.exit('Sorry, Python < 3.4 is not supported') -elif sys.version_info.minor < 4: - sys.exit('Sorry, Python < 3.4 is not supported') + sys.exit('Sorry, Python < 3.5 is not supported') +elif sys.version_info.minor < 5: + sys.exit('Sorry, Python < 3.5 is not supported') here = path.abspath(path.dirname(__file__)) @@ -27,7 +27,6 @@ 'Intended Audience :: Science/Research', 'Topic :: Scientific/Engineering', 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', ], @@ -50,7 +49,7 @@ 'tqdm==4.17.1', 'urllib3==1.22', ], - python_requires='>=3.4', + python_requires='>=3.5', # If there are data files included in your packages that need to be # installed, specify them here. If using Python 2.6 or less, then these # have to be included in MANIFEST.in as well.