Skip to content
This repository was archived by the owner on Nov 26, 2018. It is now read-only.

Commit fd0c860

Browse files
committed
Example using Celery for plugin queue
1 parent 487c870 commit fd0c860

File tree

10 files changed

+51
-80
lines changed

10 files changed

+51
-80
lines changed

Procfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
web: manage.py runserver $WEB_PORT
2-
plugins: manage.py run_plugins
2+
plugins: celery -A botbot worker -l info
33
bot: botbot-bot -v=2 -logtostderr=true

botbot/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from __future__ import absolute_import
2+
3+
# This will make sure the app is always imported when
4+
# Django starts so that shared_task will use this app.
5+
from .celery import app as celery_app

botbot/apps/plugins/management/__init__.py

Whitespace-only changes.

botbot/apps/plugins/management/commands/__init__.py

Whitespace-only changes.

botbot/apps/plugins/management/commands/run_plugins.py

Lines changed: 0 additions & 18 deletions
This file was deleted.

botbot/apps/plugins/runner.py

Lines changed: 12 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ class PluginRunner(object):
131131
Calls to plugins are done via greenlets
132132
"""
133133

134-
def __init__(self, use_gevent=False):
135-
if use_gevent:
136-
import gevent
137-
self.gevent = gevent
134+
def __init__(self):
138135
self.bot_bus = redis.StrictRedis.from_url(
139136
settings.REDIS_PLUGIN_QUEUE_URL)
140137
self.storage = redis.StrictRedis.from_url(
@@ -176,32 +173,15 @@ def register(self, plugin):
176173
getattr(self, attr.route_rule[0] + '_router').setdefault(
177174
plugin.slug, []).append((attr.route_rule[1], attr, plugin))
178175

179-
def listen(self):
180-
"""Listens for incoming messages on the Redis queue"""
181-
while 1:
182-
val = None
183-
try:
184-
val = self.bot_bus.blpop('q', 1)
185-
186-
# Track q length
187-
ql = self.bot_bus.llen('q')
188-
statsd.gauge(".".join(["plugins", "q"]), ql)
189-
190-
if val:
191-
_, val = val
192-
LOG.debug('Recieved: %s', val)
193-
line = Line(json.loads(val), self)
194-
195-
# Calculate the transport latency between go and the plugins.
196-
delta = datetime.utcnow().replace(tzinfo=utc) - line._received
197-
statsd.timing(".".join(["plugins", "latency"]),
198-
delta.total_seconds() * 1000)
176+
def process_line(self, line_json):
177+
LOG.debug('Recieved: %s', line_json)
178+
line = Line(json.loads(line_json), self)
179+
# Calculate the transport latency between go and the plugins.
180+
delta = datetime.utcnow().replace(tzinfo=utc) - line._received
181+
statsd.timing(".".join(["plugins", "latency"]),
182+
delta.total_seconds() * 1000)
183+
self.dispatch(line)
199184

200-
self.dispatch(line)
201-
except Exception:
202-
LOG.error("Line Dispatch Failed", exc_info=True, extra={
203-
"line": val
204-
})
205185

206186
def dispatch(self, line):
207187
"""Given a line, dispatch it to the right plugins & functions."""
@@ -214,16 +194,11 @@ def dispatch(self, line):
214194
# firehose gets everything, no rule matching
215195
LOG.info('Match: %s.%s', plugin_slug, func.__name__)
216196
with statsd.timer(".".join(["plugins", plugin_slug])):
217-
# FIXME: This will not have correct timing if go back to
218-
# gevent.
219197
channel_plugin = self.setup_plugin_for_channel(
220198
plugin.__class__, line)
221199
new_func = log_on_error(LOG, getattr(channel_plugin,
222200
func.__name__))
223-
if hasattr(self, 'gevent'):
224-
self.gevent.Greenlet.spawn(new_func, line)
225-
else:
226-
channel_plugin.respond(new_func(line))
201+
channel_plugin.respond(new_func(line))
227202

228203
# pass line to other routers
229204
if line._is_message:
@@ -252,30 +227,12 @@ def check_for_plugin_route_matches(self, line, router):
252227
if match:
253228
LOG.info('Match: %s.%s', plugin_slug, func.__name__)
254229
with statsd.timer(".".join(["plugins", plugin_slug])):
255-
# FIXME: This will not have correct timing if go back to
256-
# gevent.
257230
# Instantiate a plugin specific to this channel
258231
channel_plugin = self.setup_plugin_for_channel(
259232
plugin.__class__, line)
260233
# get the method from the channel-specific plugin
261234
new_func = log_on_error(LOG, getattr(channel_plugin,
262235
func.__name__))
263-
if hasattr(self, 'gevent'):
264-
grnlt = self.gevent.Greenlet(new_func, line,
265-
**match.groupdict())
266-
grnlt.link_value(channel_plugin.greenlet_respond)
267-
grnlt.start()
268-
else:
269-
channel_plugin.respond(new_func(line,
270-
**match.groupdict()))
271236

272-
273-
def start_plugins(*args, **kwargs):
274-
"""
275-
Used by the management command to start-up plugin listener
276-
and register the plugins.
277-
"""
278-
LOG.info('Starting plugins. Gevent=%s', kwargs['use_gevent'])
279-
app = PluginRunner(**kwargs)
280-
app.register_all_plugins()
281-
app.listen()
237+
channel_plugin.respond(new_func(line,
238+
**match.groupdict()))

botbot/apps/plugins/tasks.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from botbot.celery import app
2+
from .runner import PluginRunner
3+
4+
5+
runner = PluginRunner()
6+
runner.register_all_plugins()
7+
8+
@app.task(bind=True)
9+
def route_line(self, line_json):
10+
try:
11+
runner.process_line(line_json)
12+
# For any error we retry after 10 seconds.
13+
except Exception as exc:
14+
raise self.retry(exc, countdown=10)

botbot/celery.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from __future__ import absolute_import
2+
3+
import os
4+
5+
from celery import Celery
6+
7+
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'botbot.settings')
8+
9+
from django.conf import settings
10+
11+
app = Celery('botbot')
12+
13+
# Using a string here means the worker will not have to
14+
# pickle the object when using Windows.
15+
app.config_from_object('django.conf:settings')
16+
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

botbot/settings/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,9 @@
272272
# Third party app settings
273273
# ==============================================================================
274274

275-
# SOUTH_DATABASE_ADAPTERS = {'default': 'south.db.postgresql_psycopg2'}
275+
CELERY_TASK_SERIALIZER='json'
276+
CELERY_ACCEPT_CONTENT=['json']
277+
BROKER_URL = REDIS_PLUGIN_QUEUE_URL
276278

277279
SOCIAL_AUTH_USER_MODEL = AUTH_USER_MODEL
278280
SOCIAL_AUTH_PROTECTED_USER_FIELDS = ['email']

manage.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@
22
import sys
33

44
if __name__ == "__main__":
5-
if (len(sys.argv) > 1 and
6-
'run_plugins' in sys.argv and '--with-gevent' in sys.argv):
7-
# import gevent as soon as possible
8-
from gevent import monkey; monkey.patch_all()
9-
from psycogreen.gevent import patch_psycopg; patch_psycopg()
105

116
import os
127
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "botbot.settings")

0 commit comments

Comments
 (0)