A copyright violation detector running on Wikimedia Cloud Services https://tools.wmflabs.org/copyvios/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

103 lines
2.7 KiB

  1. # -*- coding: utf-8 -*-
  2. from contextlib import contextmanager
  3. import datetime
  4. from os.path import expanduser, join
  5. from flask import g, request
  6. import oursql
  7. from sqlalchemy.pool import manage
  8. oursql = manage(oursql)
  9. __all__ = ["Query", "cache", "get_db", "get_notice", "httpsfix", "urlstrip"]
  10. class Query(object):
  11. def __init__(self, method="GET"):
  12. self.query = {}
  13. data = request.form if method == "POST" else request.args
  14. for key in data:
  15. self.query[key] = data.getlist(key)[-1]
  16. def __getattr__(self, key):
  17. return self.query.get(key)
  18. def __setattr__(self, key, value):
  19. if key == "query":
  20. super(Query, self).__setattr__(key, value)
  21. else:
  22. self.query[key] = value
  23. class _AppCache(object):
  24. def __init__(self):
  25. super(_AppCache, self).__setattr__("_data", {})
  26. def __getattr__(self, key):
  27. return self._data[key]
  28. def __setattr__(self, key, value):
  29. self._data[key] = value
  30. cache = _AppCache()
  31. def _connect_to_db(engine, args):
  32. if engine == "mysql":
  33. args["read_default_file"] = expanduser("~/.my.cnf")
  34. args["autoping"] = True
  35. args["autoreconnect"] = True
  36. return oursql.connect(**args)
  37. if engine == "sqlite":
  38. import apsw
  39. dbpath = join(cache.bot.config.root_dir, "copyvios.db")
  40. return apsw.Connection(dbpath)
  41. raise ValueError("Unknown engine: %s" % engine)
  42. def get_db():
  43. if not g._db:
  44. args = cache.bot.config.wiki["_copyviosSQL"].copy()
  45. g._engine = engine = args.pop("engine", "mysql").lower()
  46. g._db = _connect_to_db(engine, args)
  47. return g._db
  48. @contextmanager
  49. def get_cursor(conn):
  50. if g._engine == "mysql":
  51. with conn.cursor() as cursor:
  52. yield cursor
  53. elif g._engine == "sqlite":
  54. with conn:
  55. yield conn.cursor()
  56. else:
  57. raise ValueError("Unknown engine: %s" % g._engine)
  58. def get_notice():
  59. try:
  60. with open(expanduser("~/copyvios_notice.html")) as fp:
  61. lines = fp.read().decode("utf8").strip().splitlines()
  62. if lines[0] == "<!-- active -->":
  63. return "\n".join(lines[1:])
  64. return None
  65. except IOError:
  66. return None
  67. def httpsfix(context, url):
  68. if url.startswith("http://"):
  69. url = url[len("http:"):]
  70. return url
  71. def parse_wiki_timestamp(timestamp):
  72. return datetime.datetime.strptime(timestamp, '%Y%m%d%H%M%S')
  73. def urlstrip(context, url):
  74. if url.startswith("http://"):
  75. url = url[7:]
  76. if url.startswith("https://"):
  77. url = url[8:]
  78. if url.startswith("www."):
  79. url = url[4:]
  80. if url.endswith("/"):
  81. url = url[:-1]
  82. return url