From 1015298109c562207f8556b2e2789730f520b4b8 Mon Sep 17 00:00:00 2001 From: Ben Kurtovic Date: Wed, 4 Jun 2014 13:49:52 -0400 Subject: [PATCH] Make it easy to stop crawler/parsers. Cleanup. --- bitshift/crawler/crawl.py | 26 +++++++++++++++++----- bitshift/crawler/crawler.py | 38 ++++++++++++++++---------------- bitshift/crawler/indexer.py | 53 +++++++++++++++++++++++++++------------------ 3 files changed, 72 insertions(+), 45 deletions(-) diff --git a/bitshift/crawler/crawl.py b/bitshift/crawler/crawl.py index 70c2943..d34deb9 100644 --- a/bitshift/crawler/crawl.py +++ b/bitshift/crawler/crawl.py @@ -4,7 +4,12 @@ Contains functions for initializing all subsidiary, threaded crawlers. """ -import logging, logging.handlers, os, Queue +import logging +import logging.handlers +import os +import Queue +import time +from threading import Event from bitshift.crawler import crawler, indexer from bitshift.parser import start_parse_servers @@ -26,15 +31,26 @@ def crawl(): MAX_URL_QUEUE_SIZE = 5e3 repo_clone_queue = Queue.Queue(maxsize=MAX_URL_QUEUE_SIZE) - threads = [crawler.GitHubCrawler(repo_clone_queue), - crawler.BitbucketCrawler(repo_clone_queue), - indexer.GitIndexer(repo_clone_queue)] + run_event = Event() + run_event.set() + threads = [crawler.GitHubCrawler(repo_clone_queue, run_event), + crawler.BitbucketCrawler(repo_clone_queue, run_event), + indexer.GitIndexer(repo_clone_queue, run_event)] for thread in threads: thread.start() - parse_servers = start_parse_servers() + try: + while 1: + time.sleep(0.1) + except KeyboardInterrupt: + run_event.clear() + for thread in threads: + thread.join() + for server in parse_servers: + server.kill() + def _configure_logging(): # This isn't ideal, since it means the bitshift python package must be kept # inside the app, but it works for now: diff --git a/bitshift/crawler/crawler.py b/bitshift/crawler/crawler.py index 9501bd0..e0b9945 100644 --- a/bitshift/crawler/crawler.py +++ b/bitshift/crawler/crawler.py @@ -4,12 +4,13 @@ Contains all website/framework-specific Class crawlers. """ -import logging, requests, time, threading +import logging +import time +import threading -from bitshift.crawler import indexer +import requests -from ..codelet import Codelet -from ..database import Database +from . import indexer class GitHubCrawler(threading.Thread): """ @@ -30,7 +31,7 @@ class GitHubCrawler(threading.Thread): "client_secret" : "8deeefbc2439409c5b7a092fd086772fe8b1f24e" } - def __init__(self, clone_queue): + def __init__(self, clone_queue, run_event): """ Create an instance of the singleton `GitHubCrawler`. @@ -40,6 +41,7 @@ class GitHubCrawler(threading.Thread): """ self.clone_queue = clone_queue + self.run_event = run_event self._logger = logging.getLogger("%s.%s" % (__name__, self.__class__.__name__)) self._logger.info("Starting.") @@ -59,14 +61,13 @@ class GitHubCrawler(threading.Thread): next_api_url = "https://api.github.com/repositories" api_request_interval = 5e3 / 60 ** 2 - while len(next_api_url) > 0: + while next_api_url and self.run_event.is_set(): start_time = time.time() try: resp = requests.get(next_api_url, params=self.AUTHENTICATION) - except ConnectionError as excep: - self._logger.warning("API %s call failed: %s: %s", - next_api_url, excep.__class__.__name__, excep) + except requests.ConnectionError: + self._logger.exception("API %s call failed:" % next_api_url) time.sleep(0.5) continue @@ -169,7 +170,7 @@ class BitbucketCrawler(threading.Thread): :ivar _logger: (:class:`logging.Logger`) A class-specific logger object. """ - def __init__(self, clone_queue): + def __init__(self, clone_queue, run_event): """ Create an instance of the singleton `BitbucketCrawler`. @@ -179,6 +180,7 @@ class BitbucketCrawler(threading.Thread): """ self.clone_queue = clone_queue + self.run_event = run_event self._logger = logging.getLogger("%s.%s" % (__name__, self.__class__.__name__)) self._logger.info("Starting.") @@ -186,7 +188,7 @@ class BitbucketCrawler(threading.Thread): def run(self): """ - Query the Bitbucket API for data about every public repository. + Query the Bitbucket API for data about every public repository. Query the Bitbucket API's "/repositories" endpoint and read its paginated responses in a loop; any "git" repositories have their @@ -196,13 +198,12 @@ class BitbucketCrawler(threading.Thread): next_api_url = "https://api.bitbucket.org/2.0/repositories" - while True: + while self.run_event.is_set(): try: response = requests.get(next_api_url).json() - except ConnectionError as exception: + except requests.ConnectionError: + self._logger.exception("API %s call failed:", next_api_url) time.sleep(0.5) - self._logger.warning("API %s call failed: %s: %s", - next_api_url, excep.__class__.__name__, excep) continue queue_percent_full = (float(self.clone_queue.qsize()) / @@ -220,16 +221,15 @@ class BitbucketCrawler(threading.Thread): clone_url = (clone_links[0]["href"] if clone_links[0]["name"] == "https" else clone_links[1]["href"]) - links.append("clone_url") try: watchers = requests.get( repo["links"]["watchers"]["href"]) rank = len(watchers.json()["values"]) / 100 - except ConnectionError as exception: + except requests.ConnectionError: + err = "API %s call failed:" % next_api_url + self._logger.exception(err) time.sleep(0.5) - self._logger.warning("API %s call failed: %s: %s", - next_api_url, excep.__class__.__name__, excep) continue self.clone_queue.put(indexer.GitRepository( diff --git a/bitshift/crawler/indexer.py b/bitshift/crawler/indexer.py index 5b5e83d..99a1308 100644 --- a/bitshift/crawler/indexer.py +++ b/bitshift/crawler/indexer.py @@ -3,8 +3,17 @@ repositories. """ -import bs4, datetime, logging, os, Queue, re, shutil, string, subprocess, time,\ - threading +import datetime +import logging +import os +import Queue +import shutil +import string +import subprocess +import time +import threading + +import bs4 from ..database import Database from ..parser import parse, UnsupportedFileError @@ -60,7 +69,7 @@ class GitIndexer(threading.Thread): :ivar _logger: (:class:`logging.Logger`) A class-specific logger object. """ - def __init__(self, clone_queue): + def __init__(self, clone_queue, run_event): """ Create an instance of the singleton `GitIndexer`. @@ -72,7 +81,8 @@ class GitIndexer(threading.Thread): MAX_INDEX_QUEUE_SIZE = 10 self.index_queue = Queue.Queue(maxsize=MAX_INDEX_QUEUE_SIZE) - self.git_cloner = _GitCloner(clone_queue, self.index_queue) + self.run_event = run_event + self.git_cloner = _GitCloner(clone_queue, self.index_queue, run_event) self.git_cloner.start() self.database = Database() self._logger = logging.getLogger("%s.%s" % @@ -94,7 +104,7 @@ class GitIndexer(threading.Thread): the queue. """ - while True: + while self.run_event.is_set(): while self.index_queue.empty(): time.sleep(THREAD_QUEUE_SLEEP) @@ -114,10 +124,10 @@ class GitIndexer(threading.Thread): :type repo_url: :class:`GitRepository` """ - with _ChangeDir("%s/%s" % (GIT_CLONE_DIR, repo.name)) as repository_dir: + with _ChangeDir("%s/%s" % (GIT_CLONE_DIR, repo.name)): try: self._insert_repository_codelets(repo) - except Exception as excep: + except Exception: self._logger.exception("Exception raised while indexing:") finally: if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo.name)): @@ -192,16 +202,16 @@ class GitIndexer(threading.Thread): try: if framework_name == "GitHub": - default_branch = subprocess.check_output("git branch" - " --no-color", shell=True)[2:-1] - return ("%s/blob/%s/%s" % (repo_url, default_branch, - filename)).replace("//", "/") + default_branch = subprocess.check_output("git branch" + " --no-color", shell=True)[2:-1] + return ("%s/blob/%s/%s" % (repo_url, default_branch, + filename)).replace("//", "/") elif framework_name == "Bitbucket": - commit_hash = subprocess.check_output("git rev-parse HEAD", - shell=True).replace("\n", "") - return ("%s/src/%s/%s" % (repo_url, commit_hash, - filename)).replace("//", "/") - except subprocess.CalledProcessError as exception: + commit_hash = subprocess.check_output("git rev-parse HEAD", + shell=True).replace("\n", "") + return ("%s/src/%s/%s" % (repo_url, commit_hash, + filename)).replace("//", "/") + except subprocess.CalledProcessError: return None def _get_git_commits(self): @@ -360,7 +370,7 @@ class GitIndexer(threading.Thread): non_ascii = file_snippet.translate(null_trans, ascii_characters) return not float(len(non_ascii)) / len(file_snippet) > 0.30 - except IOError as exception: + except IOError: return False class _GitCloner(threading.Thread): @@ -377,7 +387,7 @@ class _GitCloner(threading.Thread): :ivar _logger: (:class:`logging.Logger`) A class-specific logger object. """ - def __init__(self, clone_queue, index_queue): + def __init__(self, clone_queue, index_queue, run_event): """ Create an instance of the singleton :class:`_GitCloner`. @@ -390,6 +400,7 @@ class _GitCloner(threading.Thread): self.clone_queue = clone_queue self.index_queue = index_queue + self.run_event = run_event self._logger = logging.getLogger("%s.%s" % (__name__, self.__class__.__name__)) self._logger.info("Starting.") @@ -405,7 +416,7 @@ class _GitCloner(threading.Thread): for the `GitIndexer` to clone; otherwise, it is discarded. """ - while True: + while self.run_event.is_set(): while self.clone_queue.empty(): time.sleep(THREAD_QUEUE_SLEEP) repo = self.clone_queue.get() @@ -413,7 +424,7 @@ class _GitCloner(threading.Thread): try: self._clone_repository(repo) - except Exception as exception: + except Exception: pass def _clone_repository(self, repo): @@ -439,7 +450,7 @@ class _GitCloner(threading.Thread): try: exit_code = subprocess.call(command % (GIT_CLONE_TIMEOUT, repo.url, GIT_CLONE_DIR, repo.name), shell=True) - except Exception as exception: + except Exception: time.sleep(1) command_attempt += 1 if command_attempt == 20: