Merge pull request #158 from alerta/fix-ampq-serialize
Fix AMQP plugin date serializer
This commit is contained in:
commit
469a1e79d3
3 changed files with 59 additions and 2 deletions
plugins/amqp
|
@ -1,4 +1,6 @@
|
|||
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
|
||||
|
@ -20,6 +22,14 @@ AMQP_URL = os.environ.get('REDIS_URL') or os.environ.get('AMQP_URL') or app.conf
|
|||
AMQP_TOPIC = os.environ.get('AMQP_TOPIC') or app.config.get('AMQP_TOPIC', DEFAULT_AMQP_TOPIC)
|
||||
|
||||
|
||||
class DateEncoder(json.JSONEncoder):
|
||||
def default(self, o):
|
||||
if isinstance(o, (datetime.date, datetime.datetime)):
|
||||
return o.replace(microsecond=0).strftime('%Y-%m-%dT%H:%M:%S') + ".%03dZ" % (o.microsecond // 1000)
|
||||
else:
|
||||
return json.JSONEncoder.default(self, o)
|
||||
|
||||
|
||||
class FanoutPublisher(PluginBase):
|
||||
|
||||
def __init__(self, name=None):
|
||||
|
@ -50,7 +60,7 @@ class FanoutPublisher(PluginBase):
|
|||
LOG.info('Sending message %s to AMQP topic "%s"', alert.get_id(), AMQP_TOPIC)
|
||||
|
||||
try:
|
||||
body = alert.serialize # alerta >= 5.0
|
||||
body = json.dumps(alert.serialize, cls=DateEncoder) # alerta >= 5.0
|
||||
except Exception:
|
||||
body = alert.get_body() # alerta < 5.0
|
||||
|
||||
|
|
47
plugins/amqp/listener.py
Executable file
47
plugins/amqp/listener.py
Executable file
|
@ -0,0 +1,47 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from kombu import Connection, Exchange, Queue
|
||||
from kombu.mixins import ConsumerMixin
|
||||
|
||||
AMQP_URL = 'mongodb://localhost:27017/kombu'
|
||||
AMQP_TOPIC = 'notify'
|
||||
|
||||
|
||||
class FanoutConsumer(ConsumerMixin):
|
||||
def __init__(self, conn):
|
||||
self.connection = conn
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
exchange = Exchange(
|
||||
name=AMQP_TOPIC,
|
||||
type='fanout',
|
||||
channel=self.channel,
|
||||
durable=True
|
||||
)
|
||||
queues = [
|
||||
Queue(
|
||||
name='',
|
||||
exchange=exchange,
|
||||
routing_key='',
|
||||
channel=self.channel,
|
||||
exclusive=True
|
||||
)
|
||||
]
|
||||
return [
|
||||
Consumer(queues=queues, accept=['json'], callbacks=[self.on_message])
|
||||
]
|
||||
|
||||
def on_message(self, body, message):
|
||||
try:
|
||||
print(body)
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
message.ack()
|
||||
|
||||
if __name__ == '__main__':
|
||||
from kombu.utils.debug import setup_logging
|
||||
setup_logging(loglevel='DEBUG', loggers=[''])
|
||||
with Connection(AMQP_URL) as conn:
|
||||
consumer = FanoutConsumer(conn)
|
||||
consumer.run()
|
|
@ -1,7 +1,7 @@
|
|||
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
version = '5.4.0'
|
||||
version = '5.4.1'
|
||||
|
||||
setup(
|
||||
name="alerta-amqp",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue