A copyright violation detector running on Wikimedia Cloud Services https://tools.wmflabs.org/copyvios/
Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 
 
 

119 rader
3.1 KiB

  1. # -*- coding: utf-8 -*-
  2. from contextlib import contextmanager
  3. import datetime
  4. from os.path import expanduser, join
  5. import apsw
  6. from flask import g, request
  7. import oursql
  8. from sqlalchemy.pool import manage
  9. oursql = manage(oursql)
  10. __all__ = ["Query", "cache", "get_db", "get_notice", "httpsfix", "urlstrip"]
  11. class Query(object):
  12. def __init__(self, method="GET"):
  13. self.query = {}
  14. data = request.form if method == "POST" else request.args
  15. for key in data:
  16. self.query[key] = data.getlist(key)[-1]
  17. def __getattr__(self, key):
  18. return self.query.get(key)
  19. def __setattr__(self, key, value):
  20. if key == "query":
  21. super(Query, self).__setattr__(key, value)
  22. else:
  23. self.query[key] = value
  24. class _AppCache(object):
  25. def __init__(self):
  26. super(_AppCache, self).__setattr__("_data", {})
  27. def __getattr__(self, key):
  28. return self._data[key]
  29. def __setattr__(self, key, value):
  30. self._data[key] = value
  31. cache = _AppCache()
  32. def _connect_to_db(engine, args):
  33. if engine == "mysql":
  34. args["read_default_file"] = expanduser("~/.my.cnf")
  35. args["autoping"] = True
  36. args["autoreconnect"] = True
  37. return oursql.connect(**args)
  38. if engine == "sqlite":
  39. dbpath = join(cache.bot.config.root_dir, "copyvios.db")
  40. conn = apsw.Connection(dbpath)
  41. conn.cursor().execute("PRAGMA foreign_keys = ON")
  42. return conn
  43. raise ValueError("Unknown engine: %s" % engine)
  44. def get_db():
  45. if not g._db:
  46. args = cache.bot.config.wiki["_copyviosSQL"].copy()
  47. g._engine = engine = args.pop("engine", "mysql").lower()
  48. g._db = _connect_to_db(engine, args)
  49. return g._db
  50. @contextmanager
  51. def get_cursor(conn):
  52. if g._engine == "mysql":
  53. with conn.cursor() as cursor:
  54. yield cursor
  55. elif g._engine == "sqlite":
  56. with conn:
  57. yield conn.cursor()
  58. else:
  59. raise ValueError("Unknown engine: %s" % g._engine)
  60. def get_sql_error():
  61. if g._engine == "mysql":
  62. return oursql.Error
  63. if g._engine == "sqlite":
  64. return apsw.Error
  65. raise ValueError("Unknown engine: %s" % g._engine)
  66. def sql_dialect(mysql, sqlite):
  67. if g._engine == "mysql":
  68. return mysql
  69. if g._engine == "sqlite":
  70. return sqlite
  71. raise ValueError("Unknown engine: %s" % g._engine)
  72. def get_notice():
  73. try:
  74. with open(expanduser("~/copyvios_notice.html")) as fp:
  75. lines = fp.read().decode("utf8").strip().splitlines()
  76. if lines[0] == "<!-- active -->":
  77. return "\n".join(lines[1:])
  78. return None
  79. except IOError:
  80. return None
  81. def httpsfix(context, url):
  82. if url.startswith("http://"):
  83. url = url[len("http:"):]
  84. return url
  85. def parse_wiki_timestamp(timestamp):
  86. return datetime.datetime.strptime(timestamp, '%Y%m%d%H%M%S')
  87. def urlstrip(context, url):
  88. if url.startswith("http://"):
  89. url = url[7:]
  90. if url.startswith("https://"):
  91. url = url[8:]
  92. if url.startswith("www."):
  93. url = url[4:]
  94. if url.endswith("/"):
  95. url = url[:-1]
  96. return url