Browse Source

Substantial rework to reminders; fixes multithreading issues.

tags/v0.3
Ben Kurtovic 9 years ago
parent
commit
a28eac9426
4 changed files with 168 additions and 102 deletions
  1. +2
    -1
      CHANGELOG
  2. +2
    -1
      earwigbot/bot.py
  3. +163
    -96
      earwigbot/commands/remind.py
  4. +1
    -4
      earwigbot/commands/threads.py

+ 2
- 1
CHANGELOG View File

@@ -1,7 +1,8 @@
v0.3 (unreleased): v0.3 (unreleased):


- Copyvio detector: improved sentence splitting algorithm. - Copyvio detector: improved sentence splitting algorithm.
- IRC > !remind: Added !remind all. Improved time detection.
- IRC > !remind: Added !remind all. Fixed multithreading efficiency issues.
Improved time detection.
- IRC: Improved detection of maximum IRC message length. - IRC: Improved detection of maximum IRC message length.
- IRC: Improved some help commands. - IRC: Improved some help commands.




+ 2
- 1
earwigbot/bot.py View File

@@ -150,7 +150,8 @@ class Bot(object):
component_names = self.config.components.keys() component_names = self.config.components.keys()
skips = component_names + ["MainThread", "reminder", "irc:quit"] skips = component_names + ["MainThread", "reminder", "irc:quit"]
for thread in enumerate_threads(): for thread in enumerate_threads():
if thread.name not in skips and thread.is_alive():
if thread.is_alive() and not any(
thread.name.startswith(skip) for skip in skips):
tasks.append(thread.name) tasks.append(thread.name)
if tasks: if tasks:
log = "The following commands or tasks will be killed: {0}" log = "The following commands or tasks will be killed: {0}"


+ 163
- 96
earwigbot/commands/remind.py View File

@@ -21,7 +21,6 @@
# SOFTWARE. # SOFTWARE.


import ast import ast
from contextlib import contextmanager
from itertools import chain from itertools import chain
import operator import operator
import random import random
@@ -89,12 +88,6 @@ class Remind(Command):
raise ValueError(parsed) raise ValueError(parsed)
return parsed return parsed


@contextmanager
def _db(self):
"""Return a threadsafe context manager for the permissions database."""
with self._db_lock:
yield self.config.irc["permissions"]

def _really_get_reminder_by_id(self, user, rid): def _really_get_reminder_by_id(self, user, rid):
"""Return the _Reminder object that corresponds to a particular ID. """Return the _Reminder object that corresponds to a particular ID.


@@ -124,11 +117,11 @@ class Remind(Command):


def _start_reminder(self, reminder, user): def _start_reminder(self, reminder, user):
"""Start the given reminder object for the given user.""" """Start the given reminder object for the given user."""
reminder.start()
if user in self.reminders: if user in self.reminders:
self.reminders[user].append(reminder) self.reminders[user].append(reminder)
else: else:
self.reminders[user] = [reminder] self.reminders[user] = [reminder]
self._thread.add(reminder)


def _create_reminder(self, data, user): def _create_reminder(self, data, user):
"""Create a new reminder for the given user.""" """Create a new reminder for the given user."""
@@ -143,7 +136,6 @@ class Remind(Command):
msg = "Given time \x02{0}\x0F is too large. Keep it reasonable." msg = "Given time \x02{0}\x0F is too large. Keep it reasonable."
return self.reply(data, msg.format(data.args[0])) return self.reply(data, msg.format(data.args[0]))


end = time.time() + wait
message = " ".join(data.args[1:]) message = " ".join(data.args[1:])
try: try:
rid = self._get_new_id() rid = self._get_new_id()
@@ -151,7 +143,7 @@ class Remind(Command):
msg = "Couldn't set a new reminder: no free IDs available." msg = "Couldn't set a new reminder: no free IDs available."
return self.reply(data, msg) return self.reply(data, msg)


reminder = _Reminder(rid, user, wait, end, message, data, self)
reminder = _Reminder(rid, user, wait, message, data, self)
self._start_reminder(reminder, user) self._start_reminder(reminder, user)
msg = "Set reminder \x0303{0}\x0F ({1})." msg = "Set reminder \x0303{0}\x0F ({1})."
self.reply(data, msg.format(rid, reminder.end_time)) self.reply(data, msg.format(rid, reminder.end_time))
@@ -165,7 +157,8 @@ class Remind(Command):


def _cancel_reminder(self, data, user, reminder): def _cancel_reminder(self, data, user, reminder):
"""Cancel a pending reminder.""" """Cancel a pending reminder."""
reminder.stop()
self._thread.remove(reminder)
self.unstore_reminder(reminder.id)
self.reminders[user].remove(reminder) self.reminders[user].remove(reminder)
if not self.reminders[user]: if not self.reminders[user]:
del self.reminders[user] del self.reminders[user]
@@ -174,35 +167,34 @@ class Remind(Command):


def _snooze_reminder(self, data, reminder, arg=None): def _snooze_reminder(self, data, reminder, arg=None):
"""Snooze a reminder to be re-triggered after a period of time.""" """Snooze a reminder to be re-triggered after a period of time."""
verb = "snoozed" if reminder.end < time.time() else "adjusted"
verb = "snoozed" if reminder.expired else "adjusted"
duration = None
if arg: if arg:
try: try:
duration = self._parse_time(data.args[arg]) duration = self._parse_time(data.args[arg])
reminder.wait = duration
except (IndexError, ValueError): except (IndexError, ValueError):
pass pass


reminder.end = time.time() + reminder.wait
reminder.start()
reminder.reset(duration)
end = time.strftime("%b %d %H:%M:%S %Z", time.localtime(reminder.end)) end = time.strftime("%b %d %H:%M:%S %Z", time.localtime(reminder.end))
msg = "Reminder \x0303{0}\x0F {1} until {2}." msg = "Reminder \x0303{0}\x0F {1} until {2}."
self.reply(data, msg.format(reminder.id, verb, end)) self.reply(data, msg.format(reminder.id, verb, end))


def _load_reminders(self): def _load_reminders(self):
"""Load previously made reminders from the database.""" """Load previously made reminders from the database."""
with self._db() as permdb:
try:
database = permdb.get_attr("command:remind", "data")
except KeyError:
return
permdb.set_attr("command:remind", "data", "[]")
permdb = self.config.irc["permissions"]
try:
database = permdb.get_attr("command:remind", "data")
except KeyError:
return
permdb.set_attr("command:remind", "data", "[]")


for item in ast.literal_eval(database): for item in ast.literal_eval(database):
rid, user, wait, end, message, data = item rid, user, wait, end, message, data = item
if end < time.time(): if end < time.time():
continue continue
data = Data.unserialize(data) data = Data.unserialize(data)
reminder = _Reminder(rid, user, wait, end, message, data, self)
reminder = _Reminder(rid, user, wait, message, data, self, end)
self._start_reminder(reminder, user) self._start_reminder(reminder, user)


def _handle_command(self, command, data, user, reminder, arg=None): def _handle_command(self, command, data, user, reminder, arg=None):
@@ -299,12 +291,8 @@ class Remind(Command):
joined = " ".join("{0}: \x0306{1}\x0F.".format(k, v) for k, v in parts) joined = " ".join("{0}: \x0306{1}\x0F.".format(k, v) for k, v in parts)
self.reply(data, joined + " " + extra) self.reply(data, joined + " " + extra)


def setup(self):
self.reminders = {}
self._db_lock = RLock()
self._load_reminders()

def process(self, data):
def _process(self, data):
"""Main entry point."""
if data.command == "snooze": if data.command == "snooze":
return self._process_snooze_command(data, data.host) return self._process_snooze_command(data, data.host)
if data.command in ["cancel", "unremind", "forget"]: if data.command in ["cancel", "unremind", "forget"]:
@@ -350,67 +338,129 @@ class Remind(Command):


self._handle_command(data.args[1], data, user, reminder, 2) self._handle_command(data.args[1], data, user, reminder, 2)


@property
def lock(self):
"""Return the reminder modification/access lock."""
return self._lock

def setup(self):
self.reminders = {}
self._lock = RLock()
self._thread = _ReminderThread(self._lock)
self._load_reminders()

def process(self, data):
with self.lock:
self._process(data)

def unload(self): def unload(self):
for reminder in chain(*self.reminders.values()):
reminder.stop(delete=False)
self._thread.stop()


def store_reminder(self, reminder): def store_reminder(self, reminder):
"""Store a serialized reminder into the database.""" """Store a serialized reminder into the database."""
with self._db() as permdb:
try:
dump = permdb.get_attr("command:remind", "data")
except KeyError:
dump = "[]"
permdb = self.config.irc["permissions"]
try:
dump = permdb.get_attr("command:remind", "data")
except KeyError:
dump = "[]"


database = ast.literal_eval(dump)
database.append(reminder)
permdb.set_attr("command:remind", "data", str(database))
database = ast.literal_eval(dump)
database.append(reminder)
permdb.set_attr("command:remind", "data", str(database))


def unstore_reminder(self, rid): def unstore_reminder(self, rid):
"""Remove a reminder from the database by ID.""" """Remove a reminder from the database by ID."""
with self._db() as permdb:
try:
dump = permdb.get_attr("command:remind", "data")
except KeyError:
dump = "[]"
permdb = self.config.irc["permissions"]
try:
dump = permdb.get_attr("command:remind", "data")
except KeyError:
dump = "[]"

database = ast.literal_eval(dump)
database = [item for item in database if item[0] != rid]
permdb.set_attr("command:remind", "data", str(database))


class _ReminderThread(object):
"""A single thread that handles reminders."""

def __init__(self, lock):
self._thread = None
self._abort = False
self._active = {}
self._lock = lock

def _running(self):
"""Return if the thread should still be running."""
return self._active and not self._abort

def _get_soonest(self):
"""Get the soonest reminder to trigger."""
return min(self._active.values(), key=lambda robj: robj.end)

def _get_ready_reminder(self):
"""Block until a reminder is ready to be triggered."""
while self._running():
if self._get_soonest().end <= time.time():
return self._get_soonest()
self._lock.release()
time.sleep(0.25)
self._lock.acquire()

def _callback(self):
"""Internal callback function to be executed by the reminder thread."""
with self._lock:
while True:
reminder = self._get_ready_reminder()
if not reminder:
break

if reminder.trigger():
del self._active[reminder.id]
self._thread = None

def _start(self):
"""Start the thread."""
self._thread = Thread(target=self._callback, name="reminder")
self._thread.daemon = True
self._thread.start()
self._abort = False

def add(self, reminder):
"""Add a reminder to the table of active reminders."""
self._active[reminder.id] = reminder
if not self._thread:
self._start()

def remove(self, reminder):
"""Remove a reminder from the table of active reminders."""
if reminder.id in self._active:
del self._active[reminder.id]
if not self._active:
self.stop()

def stop(self):
"""Stop the thread."""
if not self._thread:
return
self._abort = True
self._thread = None


database = ast.literal_eval(dump)
database = [item for item in database if item[0] != rid]
permdb.set_attr("command:remind", "data", str(database))


class _Reminder(object): class _Reminder(object):
"""Represents a single reminder.""" """Represents a single reminder."""

def __init__(self, rid, user, wait, end, message, data, cmdobj):
def __init__(self, rid, user, wait, message, data, cmdobj, end=None):
self.id = rid self.id = rid
self.wait = wait self.wait = wait
self.end = end
self.end = time.time() + wait if end is None else end
self.message = message self.message = message


self._user = user self._user = user
self._data = data self._data = data
self._cmdobj = cmdobj self._cmdobj = cmdobj
self._thread = None
self._expired = False


def _callback(self):
"""Internal callback function to be executed by the reminder thread."""
thread = self._thread
while time.time() < thread.end:
time.sleep(1)
if thread.abort:
return
self._cmdobj.reply(self._data, self.message)
self._delete()
for i in xrange(60):
time.sleep(1)
if thread.abort:
return
try:
self._cmdobj.reminders[self._user].remove(self)
if not self._cmdobj.reminders[self._user]:
del self._cmdobj.reminders[self._user]
except (KeyError, ValueError): # Already canceled by the user
pass
self._save()


def _save(self): def _save(self):
"""Save this reminder to the database.""" """Save this reminder to the database."""
@@ -418,9 +468,21 @@ class _Reminder(object):
item = (self.id, self._user, self.wait, self.end, self.message, data) item = (self.id, self._user, self.wait, self.end, self.message, data)
self._cmdobj.store_reminder(item) self._cmdobj.store_reminder(item)


def _delete(self):
"""Remove this reminder from the database."""
def _fire(self):
"""Activate the reminder for the user."""
self._cmdobj.reply(self._data, self.message)
self._cmdobj.unstore_reminder(self.id) self._cmdobj.unstore_reminder(self.id)
self.end = time.time() + 60
self._expired = True

def _finalize(self):
"""Clean up after a reminder has been expired for too long."""
try:
self._cmdobj.reminders[self._user].remove(self)
if not self._cmdobj.reminders[self._user]:
del self._cmdobj.reminders[self._user]
except (KeyError, ValueError): # Already canceled by the user
pass


@property @property
def data(self): def data(self):
@@ -430,30 +492,35 @@ class _Reminder(object):
@property @property
def end_time(self): def end_time(self):
"""Return a string representing the end time of a reminder.""" """Return a string representing the end time of a reminder."""
if self.end >= time.time():
lctime = time.localtime(self.end)
if lctime.tm_year == time.localtime().tm_year:
ends = time.strftime("%b %d %H:%M:%S %Z", lctime)
else:
ends = time.strftime("%b %d, %Y %H:%M:%S %Z", lctime)
return "ends {0}".format(ends)
return "expired"

def start(self):
"""Start the reminder timer thread. Stops it if already running."""
self.stop()
self._thread = Thread(target=self._callback, name="remind-" + self.id)
self._thread.end = self.end
self._thread.daemon = True
self._thread.abort = False
self._thread.start()
if self._expired or self.end < time.time():
return "expired"
lctime = time.localtime(self.end)
if lctime.tm_year == time.localtime().tm_year:
ends = time.strftime("%b %d %H:%M:%S %Z", lctime)
else:
ends = time.strftime("%b %d, %Y %H:%M:%S %Z", lctime)
return "ends {0}".format(ends)

@property
def expired(self):
"""Return whether the reminder is expired."""
return self._expired

def reset(self, wait=None):
"""Reactivate a reminder."""
if wait is not None:
self.wait = wait
self.end = self.wait + time.time()
self._expired = False

self._cmdobj.unstore_reminder(self.id)
self._save() self._save()


def stop(self, delete=True):
"""Stop a currently running reminder."""
if not self._thread:
return
if delete:
self._delete()
self._thread.abort = True
self._thread = None
def trigger(self):
"""Hook run by the reminder thread."""
if not self._expired:
self._fire()
return False
else:
self._finalize()
return True

+ 1
- 4
earwigbot/commands/threads.py View File

@@ -71,14 +71,11 @@ class Threads(Command):
tname = thread.name tname = thread.name
ident = thread.ident % 10000 ident = thread.ident % 10000
if tname == "MainThread": if tname == "MainThread":
t = "\x0302MainThread\x0F (id {0})"
t = "\x0302main\x0F (id {0})"
normal_threads.append(t.format(ident)) normal_threads.append(t.format(ident))
elif tname in self.config.components: elif tname in self.config.components:
t = "\x0302{0}\x0F (id {1})" t = "\x0302{0}\x0F (id {1})"
normal_threads.append(t.format(tname, ident)) normal_threads.append(t.format(tname, ident))
elif tname.startswith("remind-"):
t = "\x0302reminder\x0F (id {0})"
daemon_threads.append(t.format(tname[len("remind-"):]))
elif tname.startswith("cvworker-"): elif tname.startswith("cvworker-"):
t = "\x0302copyvio worker\x0F (site {0})" t = "\x0302copyvio worker\x0F (site {0})"
daemon_threads.append(t.format(tname[len("cvworker-"):])) daemon_threads.append(t.format(tname[len("cvworker-"):]))


Loading…
Cancel
Save