Преглед на файлове

Make it easy to stop crawler/parsers. Cleanup.

tags/v1.0^2
Ben Kurtovic преди 10 години
родител
ревизия
1015298109
променени са 3 файла, в които са добавени 72 реда и са изтрити 45 реда
  1. +21
    -5
      bitshift/crawler/crawl.py
  2. +19
    -19
      bitshift/crawler/crawler.py
  3. +32
    -21
      bitshift/crawler/indexer.py

+ 21
- 5
bitshift/crawler/crawl.py Целия файл

@@ -4,7 +4,12 @@
Contains functions for initializing all subsidiary, threaded crawlers. Contains functions for initializing all subsidiary, threaded crawlers.
""" """


import logging, logging.handlers, os, Queue
import logging
import logging.handlers
import os
import Queue
import time
from threading import Event


from bitshift.crawler import crawler, indexer from bitshift.crawler import crawler, indexer
from bitshift.parser import start_parse_servers from bitshift.parser import start_parse_servers
@@ -26,15 +31,26 @@ def crawl():
MAX_URL_QUEUE_SIZE = 5e3 MAX_URL_QUEUE_SIZE = 5e3


repo_clone_queue = Queue.Queue(maxsize=MAX_URL_QUEUE_SIZE) repo_clone_queue = Queue.Queue(maxsize=MAX_URL_QUEUE_SIZE)
threads = [crawler.GitHubCrawler(repo_clone_queue),
crawler.BitbucketCrawler(repo_clone_queue),
indexer.GitIndexer(repo_clone_queue)]
run_event = Event()
run_event.set()
threads = [crawler.GitHubCrawler(repo_clone_queue, run_event),
crawler.BitbucketCrawler(repo_clone_queue, run_event),
indexer.GitIndexer(repo_clone_queue, run_event)]


for thread in threads: for thread in threads:
thread.start() thread.start()

parse_servers = start_parse_servers() parse_servers = start_parse_servers()


try:
while 1:
time.sleep(0.1)
except KeyboardInterrupt:
run_event.clear()
for thread in threads:
thread.join()
for server in parse_servers:
server.kill()

def _configure_logging(): def _configure_logging():
# This isn't ideal, since it means the bitshift python package must be kept # This isn't ideal, since it means the bitshift python package must be kept
# inside the app, but it works for now: # inside the app, but it works for now:


+ 19
- 19
bitshift/crawler/crawler.py Целия файл

@@ -4,12 +4,13 @@
Contains all website/framework-specific Class crawlers. Contains all website/framework-specific Class crawlers.
""" """


import logging, requests, time, threading
import logging
import time
import threading


from bitshift.crawler import indexer
import requests


from ..codelet import Codelet
from ..database import Database
from . import indexer


class GitHubCrawler(threading.Thread): class GitHubCrawler(threading.Thread):
""" """
@@ -30,7 +31,7 @@ class GitHubCrawler(threading.Thread):
"client_secret" : "8deeefbc2439409c5b7a092fd086772fe8b1f24e" "client_secret" : "8deeefbc2439409c5b7a092fd086772fe8b1f24e"
} }


def __init__(self, clone_queue):
def __init__(self, clone_queue, run_event):
""" """
Create an instance of the singleton `GitHubCrawler`. Create an instance of the singleton `GitHubCrawler`.


@@ -40,6 +41,7 @@ class GitHubCrawler(threading.Thread):
""" """


self.clone_queue = clone_queue self.clone_queue = clone_queue
self.run_event = run_event
self._logger = logging.getLogger("%s.%s" % self._logger = logging.getLogger("%s.%s" %
(__name__, self.__class__.__name__)) (__name__, self.__class__.__name__))
self._logger.info("Starting.") self._logger.info("Starting.")
@@ -59,14 +61,13 @@ class GitHubCrawler(threading.Thread):
next_api_url = "https://api.github.com/repositories" next_api_url = "https://api.github.com/repositories"
api_request_interval = 5e3 / 60 ** 2 api_request_interval = 5e3 / 60 ** 2


while len(next_api_url) > 0:
while next_api_url and self.run_event.is_set():
start_time = time.time() start_time = time.time()


try: try:
resp = requests.get(next_api_url, params=self.AUTHENTICATION) resp = requests.get(next_api_url, params=self.AUTHENTICATION)
except ConnectionError as excep:
self._logger.warning("API %s call failed: %s: %s",
next_api_url, excep.__class__.__name__, excep)
except requests.ConnectionError:
self._logger.exception("API %s call failed:" % next_api_url)
time.sleep(0.5) time.sleep(0.5)
continue continue


@@ -169,7 +170,7 @@ class BitbucketCrawler(threading.Thread):
:ivar _logger: (:class:`logging.Logger`) A class-specific logger object. :ivar _logger: (:class:`logging.Logger`) A class-specific logger object.
""" """


def __init__(self, clone_queue):
def __init__(self, clone_queue, run_event):
""" """
Create an instance of the singleton `BitbucketCrawler`. Create an instance of the singleton `BitbucketCrawler`.


@@ -179,6 +180,7 @@ class BitbucketCrawler(threading.Thread):
""" """


self.clone_queue = clone_queue self.clone_queue = clone_queue
self.run_event = run_event
self._logger = logging.getLogger("%s.%s" % self._logger = logging.getLogger("%s.%s" %
(__name__, self.__class__.__name__)) (__name__, self.__class__.__name__))
self._logger.info("Starting.") self._logger.info("Starting.")
@@ -186,7 +188,7 @@ class BitbucketCrawler(threading.Thread):


def run(self): def run(self):
""" """
Query the Bitbucket API for data about every public repository.
Query the Bitbucket API for data about every public repository.


Query the Bitbucket API's "/repositories" endpoint and read its Query the Bitbucket API's "/repositories" endpoint and read its
paginated responses in a loop; any "git" repositories have their paginated responses in a loop; any "git" repositories have their
@@ -196,13 +198,12 @@ class BitbucketCrawler(threading.Thread):


next_api_url = "https://api.bitbucket.org/2.0/repositories" next_api_url = "https://api.bitbucket.org/2.0/repositories"


while True:
while self.run_event.is_set():
try: try:
response = requests.get(next_api_url).json() response = requests.get(next_api_url).json()
except ConnectionError as exception:
except requests.ConnectionError:
self._logger.exception("API %s call failed:", next_api_url)
time.sleep(0.5) time.sleep(0.5)
self._logger.warning("API %s call failed: %s: %s",
next_api_url, excep.__class__.__name__, excep)
continue continue


queue_percent_full = (float(self.clone_queue.qsize()) / queue_percent_full = (float(self.clone_queue.qsize()) /
@@ -220,16 +221,15 @@ class BitbucketCrawler(threading.Thread):
clone_url = (clone_links[0]["href"] if clone_url = (clone_links[0]["href"] if
clone_links[0]["name"] == "https" else clone_links[0]["name"] == "https" else
clone_links[1]["href"]) clone_links[1]["href"])
links.append("clone_url")


try: try:
watchers = requests.get( watchers = requests.get(
repo["links"]["watchers"]["href"]) repo["links"]["watchers"]["href"])
rank = len(watchers.json()["values"]) / 100 rank = len(watchers.json()["values"]) / 100
except ConnectionError as exception:
except requests.ConnectionError:
err = "API %s call failed:" % next_api_url
self._logger.exception(err)
time.sleep(0.5) time.sleep(0.5)
self._logger.warning("API %s call failed: %s: %s",
next_api_url, excep.__class__.__name__, excep)
continue continue


self.clone_queue.put(indexer.GitRepository( self.clone_queue.put(indexer.GitRepository(


+ 32
- 21
bitshift/crawler/indexer.py Целия файл

@@ -3,8 +3,17 @@
repositories. repositories.
""" """


import bs4, datetime, logging, os, Queue, re, shutil, string, subprocess, time,\
threading
import datetime
import logging
import os
import Queue
import shutil
import string
import subprocess
import time
import threading

import bs4


from ..database import Database from ..database import Database
from ..parser import parse, UnsupportedFileError from ..parser import parse, UnsupportedFileError
@@ -60,7 +69,7 @@ class GitIndexer(threading.Thread):
:ivar _logger: (:class:`logging.Logger`) A class-specific logger object. :ivar _logger: (:class:`logging.Logger`) A class-specific logger object.
""" """


def __init__(self, clone_queue):
def __init__(self, clone_queue, run_event):
""" """
Create an instance of the singleton `GitIndexer`. Create an instance of the singleton `GitIndexer`.


@@ -72,7 +81,8 @@ class GitIndexer(threading.Thread):
MAX_INDEX_QUEUE_SIZE = 10 MAX_INDEX_QUEUE_SIZE = 10


self.index_queue = Queue.Queue(maxsize=MAX_INDEX_QUEUE_SIZE) self.index_queue = Queue.Queue(maxsize=MAX_INDEX_QUEUE_SIZE)
self.git_cloner = _GitCloner(clone_queue, self.index_queue)
self.run_event = run_event
self.git_cloner = _GitCloner(clone_queue, self.index_queue, run_event)
self.git_cloner.start() self.git_cloner.start()
self.database = Database() self.database = Database()
self._logger = logging.getLogger("%s.%s" % self._logger = logging.getLogger("%s.%s" %
@@ -94,7 +104,7 @@ class GitIndexer(threading.Thread):
the queue. the queue.
""" """


while True:
while self.run_event.is_set():
while self.index_queue.empty(): while self.index_queue.empty():
time.sleep(THREAD_QUEUE_SLEEP) time.sleep(THREAD_QUEUE_SLEEP)


@@ -114,10 +124,10 @@ class GitIndexer(threading.Thread):
:type repo_url: :class:`GitRepository` :type repo_url: :class:`GitRepository`
""" """


with _ChangeDir("%s/%s" % (GIT_CLONE_DIR, repo.name)) as repository_dir:
with _ChangeDir("%s/%s" % (GIT_CLONE_DIR, repo.name)):
try: try:
self._insert_repository_codelets(repo) self._insert_repository_codelets(repo)
except Exception as excep:
except Exception:
self._logger.exception("Exception raised while indexing:") self._logger.exception("Exception raised while indexing:")
finally: finally:
if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo.name)): if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo.name)):
@@ -192,16 +202,16 @@ class GitIndexer(threading.Thread):


try: try:
if framework_name == "GitHub": if framework_name == "GitHub":
default_branch = subprocess.check_output("git branch"
" --no-color", shell=True)[2:-1]
return ("%s/blob/%s/%s" % (repo_url, default_branch,
filename)).replace("//", "/")
default_branch = subprocess.check_output("git branch"
" --no-color", shell=True)[2:-1]
return ("%s/blob/%s/%s" % (repo_url, default_branch,
filename)).replace("//", "/")
elif framework_name == "Bitbucket": elif framework_name == "Bitbucket":
commit_hash = subprocess.check_output("git rev-parse HEAD",
shell=True).replace("\n", "")
return ("%s/src/%s/%s" % (repo_url, commit_hash,
filename)).replace("//", "/")
except subprocess.CalledProcessError as exception:
commit_hash = subprocess.check_output("git rev-parse HEAD",
shell=True).replace("\n", "")
return ("%s/src/%s/%s" % (repo_url, commit_hash,
filename)).replace("//", "/")
except subprocess.CalledProcessError:
return None return None


def _get_git_commits(self): def _get_git_commits(self):
@@ -360,7 +370,7 @@ class GitIndexer(threading.Thread):
non_ascii = file_snippet.translate(null_trans, ascii_characters) non_ascii = file_snippet.translate(null_trans, ascii_characters)
return not float(len(non_ascii)) / len(file_snippet) > 0.30 return not float(len(non_ascii)) / len(file_snippet) > 0.30


except IOError as exception:
except IOError:
return False return False


class _GitCloner(threading.Thread): class _GitCloner(threading.Thread):
@@ -377,7 +387,7 @@ class _GitCloner(threading.Thread):
:ivar _logger: (:class:`logging.Logger`) A class-specific logger object. :ivar _logger: (:class:`logging.Logger`) A class-specific logger object.
""" """


def __init__(self, clone_queue, index_queue):
def __init__(self, clone_queue, index_queue, run_event):
""" """
Create an instance of the singleton :class:`_GitCloner`. Create an instance of the singleton :class:`_GitCloner`.


@@ -390,6 +400,7 @@ class _GitCloner(threading.Thread):


self.clone_queue = clone_queue self.clone_queue = clone_queue
self.index_queue = index_queue self.index_queue = index_queue
self.run_event = run_event
self._logger = logging.getLogger("%s.%s" % self._logger = logging.getLogger("%s.%s" %
(__name__, self.__class__.__name__)) (__name__, self.__class__.__name__))
self._logger.info("Starting.") self._logger.info("Starting.")
@@ -405,7 +416,7 @@ class _GitCloner(threading.Thread):
for the `GitIndexer` to clone; otherwise, it is discarded. for the `GitIndexer` to clone; otherwise, it is discarded.
""" """


while True:
while self.run_event.is_set():
while self.clone_queue.empty(): while self.clone_queue.empty():
time.sleep(THREAD_QUEUE_SLEEP) time.sleep(THREAD_QUEUE_SLEEP)
repo = self.clone_queue.get() repo = self.clone_queue.get()
@@ -413,7 +424,7 @@ class _GitCloner(threading.Thread):


try: try:
self._clone_repository(repo) self._clone_repository(repo)
except Exception as exception:
except Exception:
pass pass


def _clone_repository(self, repo): def _clone_repository(self, repo):
@@ -439,7 +450,7 @@ class _GitCloner(threading.Thread):
try: try:
exit_code = subprocess.call(command % (GIT_CLONE_TIMEOUT, exit_code = subprocess.call(command % (GIT_CLONE_TIMEOUT,
repo.url, GIT_CLONE_DIR, repo.name), shell=True) repo.url, GIT_CLONE_DIR, repo.name), shell=True)
except Exception as exception:
except Exception:
time.sleep(1) time.sleep(1)
command_attempt += 1 command_attempt += 1
if command_attempt == 20: if command_attempt == 20:


Зареждане…
Отказ
Запис