Browse Source

Commit latest crawler, continue fix of #8.

Add:
    bitshift/crawler/*.py
        -Remove use of the `logging` module, which appeared to be causing a
        memory leak even with log-file rotation.
tags/v1.0^2
Severyn Kozak 10 years ago
parent
commit
ad7ce9d9cf
5 changed files with 45 additions and 58 deletions
  1. +3
    -1
      app.py
  2. +1
    -6
      bitshift/crawler/__init__.py
  3. +16
    -14
      bitshift/crawler/crawler.py
  4. +24
    -36
      bitshift/crawler/indexer.py
  5. +1
    -1
      setup.py

+ 3
- 1
app.py View File

@@ -5,7 +5,9 @@ Module to contain all the project's Flask server plumbing.
from flask import Flask from flask import Flask
from flask import render_template, session from flask import render_template, session


from bitshift import assets
from bitshift.query import parse_query from bitshift.query import parse_query
from bitshift.crawler import crawl


app = Flask(__name__) app = Flask(__name__)
app.config.from_object("bitshift.config") app.config.from_object("bitshift.config")
@@ -25,4 +27,4 @@ def search(query):
pass pass


if __name__ == "__main__": if __name__ == "__main__":
app.run()
crawl()

+ 1
- 6
bitshift/crawler/__init__.py View File

@@ -4,7 +4,7 @@
Contains functions for initializing all subsidiary, threaded crawlers. Contains functions for initializing all subsidiary, threaded crawlers.
""" """


import logging, Queue
import os, Queue


from bitshift.crawler import crawler, indexer from bitshift.crawler import crawler, indexer


@@ -21,11 +21,6 @@ def crawl():
""" """


MAX_URL_QUEUE_SIZE = 5e3 MAX_URL_QUEUE_SIZE = 5e3
DEBUG_FILE = "crawler.log"

logging.basicConfig(filename=DEBUG_FILE,
format="%(levelname)s %(asctime)s:\t%(threadName)s:\t%(message)s",
level=logging.DEBUG)


repo_clone_queue = Queue.Queue(maxsize=MAX_URL_QUEUE_SIZE) repo_clone_queue = Queue.Queue(maxsize=MAX_URL_QUEUE_SIZE)
threads = [crawler.GitHubCrawler(repo_clone_queue), threads = [crawler.GitHubCrawler(repo_clone_queue),


+ 16
- 14
bitshift/crawler/crawler.py View File

@@ -4,7 +4,7 @@
Contains all website/framework-specific Class crawlers. Contains all website/framework-specific Class crawlers.
""" """


import logging, requests, time, threading
import requests, time, threading


from bitshift.crawler import indexer from bitshift.crawler import indexer


@@ -34,7 +34,6 @@ class GitHubCrawler(threading.Thread):
""" """


self.clone_queue = clone_queue self.clone_queue = clone_queue
# logging.info("Starting %s." % self.__class__.__name__)
super(GitHubCrawler, self).__init__(name=self.__class__.__name__) super(GitHubCrawler, self).__init__(name=self.__class__.__name__)


def run(self): def run(self):
@@ -57,14 +56,15 @@ class GitHubCrawler(threading.Thread):


while len(next_api_url) > 0: while len(next_api_url) > 0:
start_time = time.time() start_time = time.time()
response = requests.get(next_api_url, params=authentication_params)

try:
response = requests.get(next_api_url,
params=authentication_params)
except ConnectionError as exception:
continue


queue_percent_full = (float(self.clone_queue.qsize()) / queue_percent_full = (float(self.clone_queue.qsize()) /
self.clone_queue.maxsize) * 100 self.clone_queue.maxsize) * 100
# logging.info("API call made. Limit remaining: %s. Queue-size: (%d"
# "%%) %d/%d" % (response.headers["x-ratelimit-remaining"],
# queue_percent_full, self.clone_queue.qsize(),
# self.clone_queue.maxsize))


for repo in response.json(): for repo in response.json():
while self.clone_queue.full(): while self.clone_queue.full():
@@ -107,7 +107,6 @@ class BitbucketCrawler(threading.Thread):
""" """


self.clone_queue = clone_queue self.clone_queue = clone_queue
# logging.info("Starting %s." % self.__class__.__name__)
super(BitbucketCrawler, self).__init__(name=self.__class__.__name__) super(BitbucketCrawler, self).__init__(name=self.__class__.__name__)


def run(self): def run(self):
@@ -123,13 +122,14 @@ class BitbucketCrawler(threading.Thread):
next_api_url = "https://api.bitbucket.org/2.0/repositories" next_api_url = "https://api.bitbucket.org/2.0/repositories"


while True: while True:
response = requests.get(next_api_url).json()
try:
response = requests.get(next_api_url).json()
except ConnectionError as exception:
time.sleep(0.5)
continue


queue_percent_full = (float(self.clone_queue.qsize()) / queue_percent_full = (float(self.clone_queue.qsize()) /
self.clone_queue.maxsize) * 100 self.clone_queue.maxsize) * 100
# logging.info("API call made. Queue-size: (%d%%) %d/%d" % (
# queue_percent_full, self.clone_queue.qsize(),
# self.clone_queue.maxsize))


for repo in response["values"]: for repo in response["values"]:
if repo["scm"] == "git": if repo["scm"] == "git":
@@ -137,10 +137,12 @@ class BitbucketCrawler(threading.Thread):
time.sleep(1) time.sleep(1)


clone_links = repo["links"]["clone"] clone_links = repo["links"]["clone"]
clone_url = (clone[0]["href"] if clone[0]["name"] == "https"
else clone[1]["href"])
clone_url = (clone_links[0]["href"] if
clone_links[0]["name"] == "https" else
clone_links[1]["href"])
links.append("clone_url") links.append("clone_url")
self.clone_queue.put(indexer.GitRepository( self.clone_queue.put(indexer.GitRepository(
clone_url, repo["full_name"], "Bitbucket")) clone_url, repo["full_name"], "Bitbucket"))


next_api_url = response["next"] next_api_url = response["next"]
time.sleep(0.2)

+ 24
- 36
bitshift/crawler/indexer.py View File

@@ -3,7 +3,7 @@
repositories. repositories.
""" """


import bs4, logging, os, Queue, re, shutil, string, subprocess, time, threading
import bs4, os, Queue, re, shutil, string, subprocess, time, threading


from ..database import Database from ..database import Database
from ..codelet import Codelet from ..codelet import Codelet
@@ -63,12 +63,9 @@ class GitIndexer(threading.Thread):


MAX_INDEX_QUEUE_SIZE = 10 MAX_INDEX_QUEUE_SIZE = 10


# logging.info("Starting.")

self.index_queue = Queue.Queue(maxsize=MAX_INDEX_QUEUE_SIZE) self.index_queue = Queue.Queue(maxsize=MAX_INDEX_QUEUE_SIZE)
self.git_cloner = _GitCloner(clone_queue, self.index_queue) self.git_cloner = _GitCloner(clone_queue, self.index_queue)
self.git_cloner.start() self.git_cloner.start()
self.codelet_count = 0 #debug


if not os.path.exists(GIT_CLONE_DIR): if not os.path.exists(GIT_CLONE_DIR):
os.makedirs(GIT_CLONE_DIR) os.makedirs(GIT_CLONE_DIR)
@@ -91,7 +88,10 @@ class GitIndexer(threading.Thread):


repo = self.index_queue.get() repo = self.index_queue.get()
self.index_queue.task_done() self.index_queue.task_done()
self._index_repository(repo.url, repo.name, repo.framework_name)
try:
self._index_repository(repo.url, repo.name, repo.framework_name)
except Exception as exception:
pass


def _index_repository(self, repo_url, repo_name, framework_name): def _index_repository(self, repo_url, repo_name, framework_name):
""" """
@@ -109,15 +109,11 @@ class GitIndexer(threading.Thread):
:type framework_name: str :type framework_name: str
""" """


# logging.info("Indexing repository %s." % repo_url)
with _ChangeDir("%s/%s" % (GIT_CLONE_DIR, repo_name)) as repository_dir: with _ChangeDir("%s/%s" % (GIT_CLONE_DIR, repo_name)) as repository_dir:
try: try:
self._insert_repository_codelets(repo_url, repo_name, self._insert_repository_codelets(repo_url, repo_name,
framework_name) framework_name)
except Exception as exception: except Exception as exception:
# logging.warning(
# "_insert_repository_codelets() failed: %s: %s: %s" %
# (exception.__class__.__name__, exception, repo_url))
pass pass


if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo_name)): if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo_name)):
@@ -141,17 +137,18 @@ class GitIndexer(threading.Thread):
""" """


commits_meta = _get_commits_metadata() commits_meta = _get_commits_metadata()
if commits_meta is None:
return

for filename in commits_meta.keys(): for filename in commits_meta.keys():
try: try:
with open(filename, "r") as source_file:
source = ""
with open(filename) as source_file:
source = _decode(source_file.read()) source = _decode(source_file.read())
if source is None: if source is None:
return
continue
except IOError as exception: except IOError as exception:
# logging.warning(
# "_insert_repository_codelets() failed: %s: %s: %s" %
# (exception.__class__.__name__, exception, repo_url))
pass
continue


authors = [(_decode(author),) for author in \ authors = [(_decode(author),) for author in \
commits_meta[filename]["authors"]] commits_meta[filename]["authors"]]
@@ -161,10 +158,6 @@ class GitIndexer(threading.Thread):
commits_meta[filename]["time_created"], commits_meta[filename]["time_created"],
commits_meta[filename]["time_last_modified"]) commits_meta[filename]["time_last_modified"])


self.codelet_count += 1 #debug
if self.codelet_count % 500 == 0: #debug
logging.info("Number of codelets indexed: %d.", self.codelet_count) #debug

# Database.insert(codelet) # Database.insert(codelet)


class _GitCloner(threading.Thread): class _GitCloner(threading.Thread):
@@ -191,8 +184,6 @@ class _GitCloner(threading.Thread):
:type index_queue: see :attr:`self.index_queue` :type index_queue: see :attr:`self.index_queue`
""" """


# logging.info("Starting.")

self.clone_queue = clone_queue self.clone_queue = clone_queue
self.index_queue = index_queue self.index_queue = index_queue
super(_GitCloner, self).__init__(name=self.__class__.__name__) super(_GitCloner, self).__init__(name=self.__class__.__name__)
@@ -212,7 +203,11 @@ class _GitCloner(threading.Thread):
time.sleep(THREAD_QUEUE_SLEEP) time.sleep(THREAD_QUEUE_SLEEP)
repo = self.clone_queue.get() repo = self.clone_queue.get()
self.clone_queue.task_done() self.clone_queue.task_done()
self._clone_repository(repo)

try:
self._clone_repository(repo)
except Exception as exception:
pass


def _clone_repository(self, repo): def _clone_repository(self, repo):
""" """
@@ -227,29 +222,27 @@ class _GitCloner(threading.Thread):


queue_percent_full = (float(self.index_queue.qsize()) / queue_percent_full = (float(self.index_queue.qsize()) /
self.index_queue.maxsize) * 100 self.index_queue.maxsize) * 100
# logging.info("Cloning %s. Queue-size: (%d%%) %d/%d" % (repo.url,
# queue_percent_full, self.index_queue.qsize(),
# self.index_queue.maxsize))


exit_code = None exit_code = None
command = ("perl -e 'alarm shift @ARGV; exec @ARGV' %d git clone" command = ("perl -e 'alarm shift @ARGV; exec @ARGV' %d git clone"
" --single-branch %s %s/%s || pkill -f git") " --single-branch %s %s/%s || pkill -f git")


command_attempt = 0
while exit_code is None: while exit_code is None:
try: try:
exit_code = subprocess.call(command % (GIT_CLONE_TIMEOUT, exit_code = subprocess.call(command % (GIT_CLONE_TIMEOUT,
repo.url, GIT_CLONE_DIR, repo.name), shell=True) repo.url, GIT_CLONE_DIR, repo.name), shell=True)
except:
# logging.warning("_clone_repository() failed: %s: %s",
# exception.__class__.__name__, exception)
except Exception as exception:
time.sleep(1) time.sleep(1)
continue
command_attempt += 1
if command_attempt == 20:
break
else:
continue
else: else:
break break


if exit_code != 0: if exit_code != 0:
# logging.warning("_clone_repository(): Cloning %s failed." %
# repo.url)
if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo.name)): if os.path.isdir("%s/%s" % (GIT_CLONE_DIR, repo.name)):
shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo.name)) shutil.rmtree("%s/%s" % (GIT_CLONE_DIR, repo.name))
return return
@@ -331,7 +324,6 @@ def _generate_file_url(filename, repo_url, framework_name):
return ("%s/src/%s/%s" % (repo_url, commit_hash, return ("%s/src/%s/%s" % (repo_url, commit_hash,
filename)).replace("//", "/") filename)).replace("//", "/")
except subprocess.CalledProcessError as exception: except subprocess.CalledProcessError as exception:
# logging.warning("_generate_file_url() failed: %s", exception)
return None return None


def _get_git_commits(): def _get_git_commits():
@@ -467,8 +459,6 @@ def _decode(raw):
return raw.decode(encoding) if encoding is not None else None return raw.decode(encoding) if encoding is not None else None


except (LookupError, UnicodeDecodeError, UserWarning) as exception: except (LookupError, UnicodeDecodeError, UserWarning) as exception:
# logging.warning("_decode() failed: %s: %s",
# exception.__class__.__name__, exception)
return None return None


def _is_ascii(filename): def _is_ascii(filename):
@@ -507,6 +497,4 @@ def _is_ascii(filename):
return not float(len(non_ascii)) / len(file_snippet) > 0.30 return not float(len(non_ascii)) / len(file_snippet) > 0.30


except IOError as exception: except IOError as exception:
# logging.warning("_is_ascii() failed: %s: %s",
# exception.__class__.__name__, exception)
return False return False

+ 1
- 1
setup.py View File

@@ -5,7 +5,7 @@ setup(
version = "0.1", version = "0.1",
packages = find_packages(), packages = find_packages(),
install_requires = ["Flask>=0.10.1", "pygments>=1.6", "requests>=2.2.0", install_requires = ["Flask>=0.10.1", "pygments>=1.6", "requests>=2.2.0",
"BeautifulSoup>=3.2.1"],
"beautifulsoup4>=3.2.1", "oursql>=0.9.3.1"],
author = "Benjamin Attal, Ben Kurtovic, Severyn Kozak", author = "Benjamin Attal, Ben Kurtovic, Severyn Kozak",
license = "MIT", license = "MIT",
url = "https://github.com/earwig/bitshift" url = "https://github.com/earwig/bitshift"


Loading…
Cancel
Save