Browse Source

Add threaded cloner, GitRepository class (#7).

Add:
    bitshift/crawler/
        (crawler, indexer).py
            -add  a 'time.sleep()' call whenever a thread is blocking on items
            in a Queue, to prevent excessive polling (which hogs system
            resources).

        indexer.py
            -move 'git clone' functionality from the 'GitIndexer' singleton to
            a separate, threaded '_GitCloner'.
            -'crawler.GitHubCrawler' now shares a "clone" queue with
            '_GitCloner', which shares an "index" queue with 'GitIndexer'.
            -both indexing and cloning are time-intensive processes, so this
            improvement should (hypothetically) boost performance.
            -add `GitRepository` class, instances of which are passed around in
            the queues.
tags/v1.0^2
Severyn Kozak 10 years ago
parent
commit
3ce399adbf
2 changed files with 169 additions and 69 deletions
  1. +23
    -28
      bitshift/crawler/crawler.py
  2. +146
    -41
      bitshift/crawler/indexer.py

+ 23
- 28
bitshift/crawler/crawler.py View File

@@ -6,7 +6,7 @@ Contains all website/framework-specific Class crawlers.


import logging, requests, time, threading import logging, requests, time, threading


import bitshift.crawler.indexer
from bitshift.crawler import indexer


from ..codelet import Codelet from ..codelet import Codelet
from ..database import Database from ..database import Database
@@ -19,31 +19,22 @@ class GitHubCrawler(threading.Thread):
to its public repositories, which it inserts into a :class:`Queue.Queue` to its public repositories, which it inserts into a :class:`Queue.Queue`
shared with :class:`bitshift.crawler.indexer.GitIndexer`. shared with :class:`bitshift.crawler.indexer.GitIndexer`.


:ivar repository_queue: (:class:`Queue.Queue`) Contains dictionaries with
repository information retrieved by `GitHubCrawler`, and other Git
crawlers, to be processed by
:ivar clone_queue: (:class:`Queue.Queue`) Contains :class:`GitRepository`
with repository metadata retrieved by :class:`GitHubCrawler`, and other
Git crawlers, to be processed by
:class:`bitshift.crawler.indexer.GitIndexer`. :class:`bitshift.crawler.indexer.GitIndexer`.
""" """


def __init__(self, repository_queue):
def __init__(self, clone_queue):
""" """
Create an instance of the singleton `GitHubCrawler`. Create an instance of the singleton `GitHubCrawler`.


:param repository_queue: A queue containing dictionaries of repository
metadata retrieved by `GitHubCrawler`, meant to be processed by an
instance of :class:`bitshift.crawler.indexer.GitIndexer`.
:param clone_queue: see :attr:`self.clone_queue`


.. code-block:: python
sample_dict = {
"url" : "https://github.com/user/repo",
"name" : "repo",
"framework_name" : "GitHub"
}

:type repository_queue: :class:`Queue.Queue`
:type clone_queue: see :attr:`self.clone_queue`
""" """


self.repository_queue = repository_queue
self.clone_queue = clone_queue
logging.info("Starting.") logging.info("Starting.")
super(GitHubCrawler, self).__init__(name=self.__class__.__name__) super(GitHubCrawler, self).__init__(name=self.__class__.__name__)


@@ -54,7 +45,8 @@ class GitHubCrawler(threading.Thread):
Pull all of GitHub's repositories by making calls to its API in a loop, Pull all of GitHub's repositories by making calls to its API in a loop,
accessing a subsequent page of results via the "next" URL returned in an accessing a subsequent page of results via the "next" URL returned in an
API response header. Uses Severyn Kozak's (sevko) authentication API response header. Uses Severyn Kozak's (sevko) authentication
credentials.
credentials. For every new repository, a :class:`GitRepository` is
inserted into :attr:`self.clone_queue`.
""" """


next_api_url = "https://api.github.com/repositories" next_api_url = "https://api.github.com/repositories"
@@ -67,18 +59,21 @@ class GitHubCrawler(threading.Thread):
while len(next_api_url) > 0: while len(next_api_url) > 0:
start_time = time.time() start_time = time.time()
response = requests.get(next_api_url, params=authentication_params) response = requests.get(next_api_url, params=authentication_params)
logging.info("API call made. Limit remaining: %s." %
response.headers["x-ratelimit-remaining"])

queue_percent_full = (float(self.clone_queue.qsize()) /
self.clone_queue.maxsize) * 100
logging.info("API call made. Limit remaining: %s. Queue-size: (%d"
"%%) %d/%d" % (response.headers["x-ratelimit-remaining"],
queue_percent_full, self.clone_queue.qsize(),
self.clone_queue.maxsize))


for repo in response.json(): for repo in response.json():
while self.repository_queue.full():
pass

self.repository_queue.put({
"url" : repo["html_url"],
"name" : repo["name"],
"framework_name" : "GitHub"
})
while self.clone_queue.full():
time.sleep(1)

self.clone_queue.put(indexer.GitRepository(
repo["html_url"], repo["full_name"].replace("/", ""),
"GitHub"))


if int(response.headers["x-ratelimit-remaining"]) == 0: if int(response.headers["x-ratelimit-remaining"]) == 0:
time.sleep(int(response.headers["x-ratelimit-reset"]) - time.sleep(int(response.headers["x-ratelimit-reset"]) -


+ 146
- 41
bitshift/crawler/indexer.py View File

@@ -3,59 +3,171 @@
repositories. repositories.
""" """


import bs4, logging, os, re, shutil, subprocess, threading
import bs4, logging, os, Queue, re, shutil, subprocess, time, threading


from ..database import Database from ..database import Database
from ..codelet import Codelet from ..codelet import Codelet


import pymongo #debug
db = pymongo.MongoClient().bitshift #debug

GIT_CLONE_DIR = "/tmp/bitshift" GIT_CLONE_DIR = "/tmp/bitshift"
THREAD_QUEUE_SLEEP = 0.5

class GitRepository(object):
"""
A representation of a Git repository's metadata.

:ivar url: (str) The repository's url.
:ivar name: (str) The name of the repository.
:ivar framework_name: (str) The name of the online Git framework that the
repository belongs to (eg, GitHub, BitBucket).
"""

def __init__(self, url, name, framework_name):
"""
Create a GitRepository instance.

:param url: see :attr:`GitRepository.url`
:param name: see :attr:`GitRepository.name`
:param framework_name: see :attr:`GitRepository.framework_name`

:type url: str
:type name: str
:type framework_name: str
"""

self.url = url
self.name = name
self.framework_name = framework_name


class GitIndexer(threading.Thread): class GitIndexer(threading.Thread):
""" """
A singleton Git repository indexer. A singleton Git repository indexer.


`GitIndexer` clones and indexes the repositories at urls found by the
:mod:`bitshift.crawler.crawler` Git crawlers.
:class:`GitIndexer` indexes the repositories cloned by the
:class:`_GitCloner` singleton.


:ivar repository_queue: (:class:`Queue.Queue`) A queue containing urls found
by the :mod:`bitshift.crawler.crawler` Git crawlers.
:ivar index_queue: (:class:`Queue.Queue`) A queue containing
:class:`GitRepository` objects for every new repository succesfully
cloned by :class:`_GitCloner`, which are to be indexed.
:ivar git_cloner: (:class:`_GitCloner`) The corresponding repository cloner,
which feeds :class:`GitIndexer`.
""" """


def __init__(self, repository_queue):
def __init__(self, clone_queue):
""" """
Create an instance of the singleton `GitIndexer`. Create an instance of the singleton `GitIndexer`.


:param repository_queue: see :attr:`GitIndexer.repository_queue`
:param clone_queue: see :attr:`self.index_queue`


:type repository_queue: see :attr:`GitIndexer.repository_queue`
:type index_queue: see :attr:`self.index_queue`
""" """


self.repository_queue = repository_queue
MAX_INDEX_QUEUE_SIZE = 10

logging.info("Starting.")
self.index_queue = Queue.Queue(maxsize=MAX_INDEX_QUEUE_SIZE)
self.git_cloner = _GitCloner(clone_queue, self.index_queue)
self.git_cloner.start()


if not os.path.exists(GIT_CLONE_DIR): if not os.path.exists(GIT_CLONE_DIR):
os.makedirs(GIT_CLONE_DIR) os.makedirs(GIT_CLONE_DIR)


logging.info("Starting.")
super(GitIndexer, self).__init__(name=self.__class__.__name__) super(GitIndexer, self).__init__(name=self.__class__.__name__)


def run(self): def run(self):
""" """
Retrieve new repository urls, clone, and index them.
Retrieve metadata about newly cloned repositories and index them.

Blocks until new repositories appear in :attr:`self.index_queue`, then
retrieves one, and attempts indexing it. Should any errors occur, the
new repository will be discarded and the indexer will index the next in
the queue.
"""

while True:
while self.index_queue.empty():
logging.warning("Empty.")
time.sleep(THREAD_QUEUE_SLEEP)

repo = self.index_queue.get()
self.index_queue.task_done()
_index_repository(repo.url, repo.name, repo.framework_name)

class _GitCloner(threading.Thread):
"""
A singleton Git repository cloner.

:ivar clone_queue: (:class:`Queue.Queue`) see
:attr:`bitshift.crawler.crawler.GitHubCrawler.clone_queue`.
:ivar index_queue: (:class:`Queue.Queue`) see
:attr:`GitIndexer.index_queue`.
"""

def __init__(self, clone_queue, index_queue):
"""
Create an instance of the singleton :class:`_GitCloner`.

:param clone_queue: see :attr:`self.clone_queue`
:param index_queue: see :attr:`self.index_queue`

:type clone_queue: see :attr:`self.clone_queue`
:type index_queue: see :attr:`self.index_queue`
"""

self.clone_queue = clone_queue
self.index_queue = index_queue
super(_GitCloner, self).__init__(name=self.__class__.__name__)

def run(self):
"""
Retrieve metadata about newly crawled repositories and clone them.


Blocks until new urls appear in :attr:`GitIndexer.repository_queue`,
then retrieves one, and attempts cloning/indexing it. Should any errors
occur, the new repository will be discarded and the crawler will
index the next in the queue.
Blocks until new :class:`GitRepository` appear in
:attr:`self.clone_queue`, then attempts cloning them. If
succcessful, the cloned repository is added to :attr:`self.index_queue`
for the `GitIndexer` to clone; otherwise, it is discarded.
""" """


while True: while True:
while self.repository_queue.empty():
pass
while self.clone_queue.empty():
time.sleep(THREAD_QUEUE_SLEEP)
repo = self.clone_queue.get()
self.clone_queue.task_done()
self._clone_repository(repo)


repo = self.repository_queue.get()
self.repository_queue.task_done()
_index_repository(repo["url"], repo["name"],
repo["framework_name"])
def _clone_repository(self, repo):
"""
Attempt cloning a Git repository.

:param repo: Metadata about the repository to clone.

:type repo: :class:`GitRepository`
"""

GIT_CLONE_TIMEOUT = 500

queue_percent_full = (float(self.index_queue.qsize()) /
self.index_queue.maxsize) * 100
logging.info("Cloning %s. Queue-size: (%d%%) %d/%d" % (repo.url,
queue_percent_full, self.index_queue.qsize(),
self.index_queue.maxsize))

with _ChangeDir(GIT_CLONE_DIR) as git_clone_dir:
if subprocess.call("perl -e 'alarm shift @ARGV; exec @ARGV' %d git"
" clone %s %s" % (GIT_CLONE_TIMEOUT, repo.url, repo.name),
shell=True) != 0:
logging.debug("_clone_repository(): Cloning %s failed." %
repo.url)
if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo.name)):
shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo.name))
return

while self.index_queue.full():
time.sleep(THREAD_QUEUE_SLEEP)

self.index_queue.put(repo)


class _ChangeDir(object): class _ChangeDir(object):
""" """
@@ -111,27 +223,17 @@ def _index_repository(repo_url, repo_name, framework_name):
:type framework_name: str :type framework_name: str
""" """


GIT_CLONE_TIMEOUT = 600

logging.info("Indexing repository %s." % repo_url) logging.info("Indexing repository %s." % repo_url)
with _ChangeDir(GIT_CLONE_DIR) as git_clone_dir:
if subprocess.call("perl -e 'alarm shift @ARGV; exec @ARGV' %d git \
clone %s" % (GIT_CLONE_TIMEOUT, repo_url), shell=True) != 0:
logging.debug("_index_repository(): Cloning %s failed." % repo_url)
if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo_name)):
shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo_name))
return

with _ChangeDir(repo_name) as repository_dir:
try:
_insert_repository_codelets(repo_url, repo_name,
framework_name)
except Exception as exception:
logging.warning("%s: _insert_repository_codelets"
" failed %s." % (exception, repo_url))
pass

shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo_name))
with _ChangeDir("%s/%s" % (GIT_CLONE_DIR, repo_name)) as repository_dir:
try:
_insert_repository_codelets(repo_url, repo_name,
framework_name)
except Exception as exception:
logging.warning("%s: _insert_repository_codelets failed %s." %
(exception, repo_url))

if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo_name)):
shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo_name))


def _insert_repository_codelets(repo_url, repo_name, framework_name): def _insert_repository_codelets(repo_url, repo_name, framework_name):
""" """
@@ -164,6 +266,9 @@ def _insert_repository_codelets(repo_url, repo_name, framework_name):
framework_name), framework_name),
commits_meta[filename]["time_created"], commits_meta[filename]["time_created"],
commits_meta[filename]["time_last_modified"]) commits_meta[filename]["time_last_modified"])
db.codelets.insert({
"name" : codelet.name
})


# Database.insert(codelet) # Database.insert(codelet)




Loading…
Cancel
Save