From 3ce399adbf5ebae2fcff017c8c680e21be31d4a7 Mon Sep 17 00:00:00 2001 From: Severyn Kozak Date: Thu, 17 Apr 2014 14:05:12 -0400 Subject: [PATCH] Add threaded cloner, GitRepository class (#7). Add: bitshift/crawler/ (crawler, indexer).py -add a 'time.sleep()' call whenever a thread is blocking on items in a Queue, to prevent excessive polling (which hogs system resources). indexer.py -move 'git clone' functionality from the 'GitIndexer' singleton to a separate, threaded '_GitCloner'. -'crawler.GitHubCrawler' now shares a "clone" queue with '_GitCloner', which shares an "index" queue with 'GitIndexer'. -both indexing and cloning are time-intensive processes, so this improvement should (hypothetically) boost performance. -add `GitRepository` class, instances of which are passed around in the queues. --- bitshift/crawler/crawler.py | 51 ++++++------ bitshift/crawler/indexer.py | 187 ++++++++++++++++++++++++++++++++++---------- 2 files changed, 169 insertions(+), 69 deletions(-) diff --git a/bitshift/crawler/crawler.py b/bitshift/crawler/crawler.py index edd8eaf..8509c6d 100644 --- a/bitshift/crawler/crawler.py +++ b/bitshift/crawler/crawler.py @@ -6,7 +6,7 @@ Contains all website/framework-specific Class crawlers. import logging, requests, time, threading -import bitshift.crawler.indexer +from bitshift.crawler import indexer from ..codelet import Codelet from ..database import Database @@ -19,31 +19,22 @@ class GitHubCrawler(threading.Thread): to its public repositories, which it inserts into a :class:`Queue.Queue` shared with :class:`bitshift.crawler.indexer.GitIndexer`. - :ivar repository_queue: (:class:`Queue.Queue`) Contains dictionaries with - repository information retrieved by `GitHubCrawler`, and other Git - crawlers, to be processed by + :ivar clone_queue: (:class:`Queue.Queue`) Contains :class:`GitRepository` + with repository metadata retrieved by :class:`GitHubCrawler`, and other + Git crawlers, to be processed by :class:`bitshift.crawler.indexer.GitIndexer`. """ - def __init__(self, repository_queue): + def __init__(self, clone_queue): """ Create an instance of the singleton `GitHubCrawler`. - :param repository_queue: A queue containing dictionaries of repository - metadata retrieved by `GitHubCrawler`, meant to be processed by an - instance of :class:`bitshift.crawler.indexer.GitIndexer`. + :param clone_queue: see :attr:`self.clone_queue` - .. code-block:: python - sample_dict = { - "url" : "https://github.com/user/repo", - "name" : "repo", - "framework_name" : "GitHub" - } - - :type repository_queue: :class:`Queue.Queue` + :type clone_queue: see :attr:`self.clone_queue` """ - self.repository_queue = repository_queue + self.clone_queue = clone_queue logging.info("Starting.") super(GitHubCrawler, self).__init__(name=self.__class__.__name__) @@ -54,7 +45,8 @@ class GitHubCrawler(threading.Thread): Pull all of GitHub's repositories by making calls to its API in a loop, accessing a subsequent page of results via the "next" URL returned in an API response header. Uses Severyn Kozak's (sevko) authentication - credentials. + credentials. For every new repository, a :class:`GitRepository` is + inserted into :attr:`self.clone_queue`. """ next_api_url = "https://api.github.com/repositories" @@ -67,18 +59,21 @@ class GitHubCrawler(threading.Thread): while len(next_api_url) > 0: start_time = time.time() response = requests.get(next_api_url, params=authentication_params) - logging.info("API call made. Limit remaining: %s." % - response.headers["x-ratelimit-remaining"]) + + queue_percent_full = (float(self.clone_queue.qsize()) / + self.clone_queue.maxsize) * 100 + logging.info("API call made. Limit remaining: %s. Queue-size: (%d" + "%%) %d/%d" % (response.headers["x-ratelimit-remaining"], + queue_percent_full, self.clone_queue.qsize(), + self.clone_queue.maxsize)) for repo in response.json(): - while self.repository_queue.full(): - pass - - self.repository_queue.put({ - "url" : repo["html_url"], - "name" : repo["name"], - "framework_name" : "GitHub" - }) + while self.clone_queue.full(): + time.sleep(1) + + self.clone_queue.put(indexer.GitRepository( + repo["html_url"], repo["full_name"].replace("/", ""), + "GitHub")) if int(response.headers["x-ratelimit-remaining"]) == 0: time.sleep(int(response.headers["x-ratelimit-reset"]) - diff --git a/bitshift/crawler/indexer.py b/bitshift/crawler/indexer.py index b1e8e34..7e82bb5 100644 --- a/bitshift/crawler/indexer.py +++ b/bitshift/crawler/indexer.py @@ -3,59 +3,171 @@ repositories. """ -import bs4, logging, os, re, shutil, subprocess, threading +import bs4, logging, os, Queue, re, shutil, subprocess, time, threading from ..database import Database from ..codelet import Codelet +import pymongo #debug +db = pymongo.MongoClient().bitshift #debug + GIT_CLONE_DIR = "/tmp/bitshift" +THREAD_QUEUE_SLEEP = 0.5 + +class GitRepository(object): + """ + A representation of a Git repository's metadata. + + :ivar url: (str) The repository's url. + :ivar name: (str) The name of the repository. + :ivar framework_name: (str) The name of the online Git framework that the + repository belongs to (eg, GitHub, BitBucket). + """ + + def __init__(self, url, name, framework_name): + """ + Create a GitRepository instance. + + :param url: see :attr:`GitRepository.url` + :param name: see :attr:`GitRepository.name` + :param framework_name: see :attr:`GitRepository.framework_name` + + :type url: str + :type name: str + :type framework_name: str + """ + + self.url = url + self.name = name + self.framework_name = framework_name class GitIndexer(threading.Thread): """ A singleton Git repository indexer. - `GitIndexer` clones and indexes the repositories at urls found by the - :mod:`bitshift.crawler.crawler` Git crawlers. + :class:`GitIndexer` indexes the repositories cloned by the + :class:`_GitCloner` singleton. - :ivar repository_queue: (:class:`Queue.Queue`) A queue containing urls found - by the :mod:`bitshift.crawler.crawler` Git crawlers. + :ivar index_queue: (:class:`Queue.Queue`) A queue containing + :class:`GitRepository` objects for every new repository succesfully + cloned by :class:`_GitCloner`, which are to be indexed. + :ivar git_cloner: (:class:`_GitCloner`) The corresponding repository cloner, + which feeds :class:`GitIndexer`. """ - def __init__(self, repository_queue): + def __init__(self, clone_queue): """ Create an instance of the singleton `GitIndexer`. - :param repository_queue: see :attr:`GitIndexer.repository_queue` + :param clone_queue: see :attr:`self.index_queue` - :type repository_queue: see :attr:`GitIndexer.repository_queue` + :type index_queue: see :attr:`self.index_queue` """ - self.repository_queue = repository_queue + MAX_INDEX_QUEUE_SIZE = 10 + + logging.info("Starting.") + self.index_queue = Queue.Queue(maxsize=MAX_INDEX_QUEUE_SIZE) + self.git_cloner = _GitCloner(clone_queue, self.index_queue) + self.git_cloner.start() if not os.path.exists(GIT_CLONE_DIR): os.makedirs(GIT_CLONE_DIR) - logging.info("Starting.") super(GitIndexer, self).__init__(name=self.__class__.__name__) def run(self): """ - Retrieve new repository urls, clone, and index them. + Retrieve metadata about newly cloned repositories and index them. + + Blocks until new repositories appear in :attr:`self.index_queue`, then + retrieves one, and attempts indexing it. Should any errors occur, the + new repository will be discarded and the indexer will index the next in + the queue. + """ + + while True: + while self.index_queue.empty(): + logging.warning("Empty.") + time.sleep(THREAD_QUEUE_SLEEP) + + repo = self.index_queue.get() + self.index_queue.task_done() + _index_repository(repo.url, repo.name, repo.framework_name) + +class _GitCloner(threading.Thread): + """ + A singleton Git repository cloner. + + :ivar clone_queue: (:class:`Queue.Queue`) see + :attr:`bitshift.crawler.crawler.GitHubCrawler.clone_queue`. + :ivar index_queue: (:class:`Queue.Queue`) see + :attr:`GitIndexer.index_queue`. + """ + + def __init__(self, clone_queue, index_queue): + """ + Create an instance of the singleton :class:`_GitCloner`. + + :param clone_queue: see :attr:`self.clone_queue` + :param index_queue: see :attr:`self.index_queue` + + :type clone_queue: see :attr:`self.clone_queue` + :type index_queue: see :attr:`self.index_queue` + """ + + self.clone_queue = clone_queue + self.index_queue = index_queue + super(_GitCloner, self).__init__(name=self.__class__.__name__) + + def run(self): + """ + Retrieve metadata about newly crawled repositories and clone them. - Blocks until new urls appear in :attr:`GitIndexer.repository_queue`, - then retrieves one, and attempts cloning/indexing it. Should any errors - occur, the new repository will be discarded and the crawler will - index the next in the queue. + Blocks until new :class:`GitRepository` appear in + :attr:`self.clone_queue`, then attempts cloning them. If + succcessful, the cloned repository is added to :attr:`self.index_queue` + for the `GitIndexer` to clone; otherwise, it is discarded. """ while True: - while self.repository_queue.empty(): - pass + while self.clone_queue.empty(): + time.sleep(THREAD_QUEUE_SLEEP) + repo = self.clone_queue.get() + self.clone_queue.task_done() + self._clone_repository(repo) - repo = self.repository_queue.get() - self.repository_queue.task_done() - _index_repository(repo["url"], repo["name"], - repo["framework_name"]) + def _clone_repository(self, repo): + """ + Attempt cloning a Git repository. + + :param repo: Metadata about the repository to clone. + + :type repo: :class:`GitRepository` + """ + + GIT_CLONE_TIMEOUT = 500 + + queue_percent_full = (float(self.index_queue.qsize()) / + self.index_queue.maxsize) * 100 + logging.info("Cloning %s. Queue-size: (%d%%) %d/%d" % (repo.url, + queue_percent_full, self.index_queue.qsize(), + self.index_queue.maxsize)) + + with _ChangeDir(GIT_CLONE_DIR) as git_clone_dir: + if subprocess.call("perl -e 'alarm shift @ARGV; exec @ARGV' %d git" + " clone %s %s" % (GIT_CLONE_TIMEOUT, repo.url, repo.name), + shell=True) != 0: + logging.debug("_clone_repository(): Cloning %s failed." % + repo.url) + if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo.name)): + shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo.name)) + return + + while self.index_queue.full(): + time.sleep(THREAD_QUEUE_SLEEP) + + self.index_queue.put(repo) class _ChangeDir(object): """ @@ -111,27 +223,17 @@ def _index_repository(repo_url, repo_name, framework_name): :type framework_name: str """ - GIT_CLONE_TIMEOUT = 600 - logging.info("Indexing repository %s." % repo_url) - with _ChangeDir(GIT_CLONE_DIR) as git_clone_dir: - if subprocess.call("perl -e 'alarm shift @ARGV; exec @ARGV' %d git \ - clone %s" % (GIT_CLONE_TIMEOUT, repo_url), shell=True) != 0: - logging.debug("_index_repository(): Cloning %s failed." % repo_url) - if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo_name)): - shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo_name)) - return - - with _ChangeDir(repo_name) as repository_dir: - try: - _insert_repository_codelets(repo_url, repo_name, - framework_name) - except Exception as exception: - logging.warning("%s: _insert_repository_codelets" - " failed %s." % (exception, repo_url)) - pass - - shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo_name)) + with _ChangeDir("%s/%s" % (GIT_CLONE_DIR, repo_name)) as repository_dir: + try: + _insert_repository_codelets(repo_url, repo_name, + framework_name) + except Exception as exception: + logging.warning("%s: _insert_repository_codelets failed %s." % + (exception, repo_url)) + + if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo_name)): + shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo_name)) def _insert_repository_codelets(repo_url, repo_name, framework_name): """ @@ -164,6 +266,9 @@ def _insert_repository_codelets(repo_url, repo_name, framework_name): framework_name), commits_meta[filename]["time_created"], commits_meta[filename]["time_last_modified"]) + db.codelets.insert({ + "name" : codelet.name + }) # Database.insert(codelet)