A copyright violation detector running on Wikimedia Cloud Services https://tools.wmflabs.org/copyvios/
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

119 lines
3.1 KiB

  1. # -*- coding: utf-8 -*-
  2. from contextlib import contextmanager
  3. import datetime
  4. from os.path import expanduser, join
  5. import apsw
  6. from flask import g, request
  7. import oursql
  8. from sqlalchemy.pool import manage
  9. oursql = manage(oursql)
  10. __all__ = ["Query", "cache", "get_db", "get_notice", "httpsfix", "urlstrip"]
  11. class Query(object):
  12. def __init__(self, method="GET"):
  13. self.query = {}
  14. data = request.form if method == "POST" else request.args
  15. for key in data:
  16. self.query[key] = data.getlist(key)[-1]
  17. def __getattr__(self, key):
  18. return self.query.get(key)
  19. def __setattr__(self, key, value):
  20. if key == "query":
  21. super(Query, self).__setattr__(key, value)
  22. else:
  23. self.query[key] = value
  24. class _AppCache(object):
  25. def __init__(self):
  26. super(_AppCache, self).__setattr__("_data", {})
  27. def __getattr__(self, key):
  28. return self._data[key]
  29. def __setattr__(self, key, value):
  30. self._data[key] = value
  31. cache = _AppCache()
  32. def _connect_to_db(engine, args):
  33. if engine == "mysql":
  34. args["read_default_file"] = expanduser("~/.my.cnf")
  35. args["autoping"] = True
  36. args["autoreconnect"] = True
  37. return oursql.connect(**args)
  38. if engine == "sqlite":
  39. dbpath = join(cache.bot.config.root_dir, "copyvios.db")
  40. conn = apsw.Connection(dbpath)
  41. conn.cursor().execute("PRAGMA foreign_keys = ON")
  42. return conn
  43. raise ValueError("Unknown engine: %s" % engine)
  44. def get_db():
  45. if not g._db:
  46. args = cache.bot.config.wiki["_copyviosSQL"].copy()
  47. g._engine = engine = args.pop("engine", "mysql").lower()
  48. g._db = _connect_to_db(engine, args)
  49. return g._db
  50. @contextmanager
  51. def get_cursor(conn):
  52. if g._engine == "mysql":
  53. with conn.cursor() as cursor:
  54. yield cursor
  55. elif g._engine == "sqlite":
  56. with conn:
  57. yield conn.cursor()
  58. else:
  59. raise ValueError("Unknown engine: %s" % g._engine)
  60. def get_sql_error():
  61. if g._engine == "mysql":
  62. return oursql.Error
  63. if g._engine == "sqlite":
  64. return apsw.Error
  65. raise ValueError("Unknown engine: %s" % g._engine)
  66. def sql_dialect(mysql, sqlite):
  67. if g._engine == "mysql":
  68. return mysql
  69. if g._engine == "sqlite":
  70. return sqlite
  71. raise ValueError("Unknown engine: %s" % g._engine)
  72. def get_notice():
  73. try:
  74. with open(expanduser("~/copyvios_notice.html")) as fp:
  75. lines = fp.read().decode("utf8").strip().splitlines()
  76. if lines[0] == "<!-- active -->":
  77. return "\n".join(lines[1:])
  78. return None
  79. except IOError:
  80. return None
  81. def httpsfix(context, url):
  82. if url.startswith("http://"):
  83. url = url[len("http:"):]
  84. return url
  85. def parse_wiki_timestamp(timestamp):
  86. return datetime.datetime.strptime(timestamp, '%Y%m%d%H%M%S')
  87. def urlstrip(context, url):
  88. if url.startswith("http://"):
  89. url = url[7:]
  90. if url.startswith("https://"):
  91. url = url[8:]
  92. if url.startswith("www."):
  93. url = url[4:]
  94. if url.endswith("/"):
  95. url = url[:-1]
  96. return url