A copyright violation detector running on Wikimedia Cloud Services https://tools.wmflabs.org/copyvios/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

112 lines
2.9 KiB

  1. # -*- coding: utf-8 -*-
  2. from contextlib import contextmanager
  3. import datetime
  4. from os.path import expanduser, join
  5. from flask import g, request
  6. import oursql
  7. from sqlalchemy.pool import manage
  8. oursql = manage(oursql)
  9. __all__ = ["Query", "cache", "get_db", "get_notice", "httpsfix", "urlstrip"]
  10. class Query(object):
  11. def __init__(self, method="GET"):
  12. self.query = {}
  13. data = request.form if method == "POST" else request.args
  14. for key in data:
  15. self.query[key] = data.getlist(key)[-1]
  16. def __getattr__(self, key):
  17. return self.query.get(key)
  18. def __setattr__(self, key, value):
  19. if key == "query":
  20. super(Query, self).__setattr__(key, value)
  21. else:
  22. self.query[key] = value
  23. class _AppCache(object):
  24. def __init__(self):
  25. super(_AppCache, self).__setattr__("_data", {})
  26. def __getattr__(self, key):
  27. return self._data[key]
  28. def __setattr__(self, key, value):
  29. self._data[key] = value
  30. cache = _AppCache()
  31. def _connect_to_db(engine, args):
  32. if engine == "mysql":
  33. args["read_default_file"] = expanduser("~/.my.cnf")
  34. args["autoping"] = True
  35. args["autoreconnect"] = True
  36. return oursql.connect(**args)
  37. if engine == "sqlite":
  38. import apsw
  39. dbpath = join(cache.bot.config.root_dir, "copyvios.db")
  40. conn = apsw.Connection(dbpath)
  41. conn.cursor().execute("PRAGMA foreign_keys = ON")
  42. return conn
  43. raise ValueError("Unknown engine: %s" % engine)
  44. def get_db():
  45. if not g._db:
  46. args = cache.bot.config.wiki["_copyviosSQL"].copy()
  47. g._engine = engine = args.pop("engine", "mysql").lower()
  48. g._db = _connect_to_db(engine, args)
  49. return g._db
  50. @contextmanager
  51. def get_cursor(conn):
  52. if g._engine == "mysql":
  53. with conn.cursor() as cursor:
  54. yield cursor
  55. elif g._engine == "sqlite":
  56. with conn:
  57. yield conn.cursor()
  58. else:
  59. raise ValueError("Unknown engine: %s" % g._engine)
  60. def sql_dialect(mysql, sqlite):
  61. if g._engine == "mysql":
  62. return mysql
  63. if g._engine == "sqlite":
  64. return sqlite
  65. raise ValueError("Unknown engine: %s" % g._engine)
  66. def get_notice():
  67. try:
  68. with open(expanduser("~/copyvios_notice.html")) as fp:
  69. lines = fp.read().decode("utf8").strip().splitlines()
  70. if lines[0] == "<!-- active -->":
  71. return "\n".join(lines[1:])
  72. return None
  73. except IOError:
  74. return None
  75. def httpsfix(context, url):
  76. if url.startswith("http://"):
  77. url = url[len("http:"):]
  78. return url
  79. def parse_wiki_timestamp(timestamp):
  80. return datetime.datetime.strptime(timestamp, '%Y%m%d%H%M%S')
  81. def urlstrip(context, url):
  82. if url.startswith("http://"):
  83. url = url[7:]
  84. if url.startswith("https://"):
  85. url = url[8:]
  86. if url.startswith("www."):
  87. url = url[4:]
  88. if url.endswith("/"):
  89. url = url[:-1]
  90. return url