Hey all , I know it has been a long time since I posted any articles on my site due to and extremely busy schedule. But I wanted to start again giving out my experience so that someone can benefit from that.
In the last week or so I have been tasked to create an event driven architecture to handle 10000+ EPS (events per second). The platform I chose was to use message queue that will act as the main bus for all messages to be streams. For this purpose I have used RabbitMQ and pika (as the system is written in Python). One of the key challenges was to create an asynchronous publisher which had to be thread safe for 10000+ EPS. Searching over the net I could not find any good resource for this but found several supporting articles that helped me to build this out.
Following is a simple python pika publisher to publish messages asynchronously and in a thread safe environment.
import logging import pika import json import time from logging.handlers import RotatingFileHandler from threading import Thread class Publisher(Thread): def __init__(self, RABBITMQ_SETTINGS,LOG4PY_SETTINGS): Thread.__init__(self) self.logger = logging.getLogger("Publisher.py") self.connection = None self.channel = None self._deliveries = [] self._acked = 0 self._nacked = 0 self._message_number = 0 self._stopping = False self.queue = 'alienvault_replicate' self.routing_key = 'alienvault_replicate' self.exchange = 'alienvault_replicate' self.message = None self.ready = False self._closing = False log4py_file = LOG4PY_SETTINGS['log4py_file'] log4py_log_level = LOG4PY_SETTINGS['log4py_log_level'] self.PUBLISH_INTERVAL=0.1 self.RABBITMQ_SETTINGS =RABBITMQ_SETTINGS if log4py_log_level == 'DEBUG': self.log_level = logging.DEBUG elif log4py_log_level == 'INFO': self.log_level = logging.INFO elif log4py_log_level == 'WARN': self.log_level = logging.WARN elif log4py_log_level == 'ERROR': self.log_level = logging.ERROR self.logger.setLevel(self.log_level) # create console handler and set level to debug rfh = RotatingFileHandler(filename=log4py_file, mode='a', maxBytes=100*1024*1024,backupCount=2) rfh.setLevel(self.log_level) # create formatter formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # add formatter to ch rfh.setFormatter(formatter) # add ch to logger self.logger.addHandler(rfh) amqp_url = 'amqp://'+self.RABBITMQ_SETTINGS['user']+':'+self.RABBITMQ_SETTINGS['passwd']+'@'+self.RABBITMQ_SETTINGS['host']+':'+str('5672')+'/%2F' self._url = amqp_url def is_ready(self): return self.ready def set_message(self,message): self.message = message self.logger.info('Message set to publish to {0}'.format(self.message)) def connect(self): self.logger.info('Connecting to %s', self._url) return pika.SelectConnection(pika.URLParameters(self._url), self.onconnection_open) def close_connection(self): self.logger.info('Closing connection') self._closing = True self.connection.close() def add_onconnection_close_callback(self): self.logger.info('Adding connection close callback') self.connection.add_on_close_callback(self.onconnection_closed) def onconnection_closed(self, connection, reply_code, reply_text): self.channel = None if self._closing: self.connection.ioloop.stop() else: self.logger.warning('Connection closed, reopening in 5 seconds: (%s) %s', reply_code, reply_text) self.ready = False self.reconnect() def onconnection_open(self, unusedconnection): self.logger.info('Connection opened') self.add_onconnection_close_callback() self.openchannel() def reconnect(self): self.connection.ioloop.stop() self.connection = self.connect() self.connection.ioloop.start() def add_onchannel_close_callback(self): self.logger.info('Adding channel close callback') self.channel.add_on_close_callback(self.onchannel_closed) def onchannel_closed(self, channel, reply_code, reply_text): self.logger.warning('Channel was closed: (%s) %s', reply_code, reply_text) if not self._closing: self.connection.close() def onchannel_open(self, channel): self.logger.info('Channel opened') self.channel = channel self.add_onchannel_close_callback() self.setup_exchange(self.exchange) def setup_exchange(self, exchange_name): self.logger.info('Declaring exchange %s', exchange_name) self.channel.exchange_declare(self.on_exchange_declareok, exchange_name) def on_exchange_declareok(self, unused_frame): self.logger.info('Exchange declared') self.setup_queue(self.queue) def setup_queue(self, queue_name): self.logger.info('Declaring queue %s', queue_name) self.channel.queue_declare(self.on_queue_declareok, queue_name,durable=True) def on_queue_declareok(self, method_frame): self.logger.info('Binding %s to %s with %s', self.exchange, self.queue, self.routing_key) self.channel.queue_bind(self.on_bindok, self.queue, self.exchange, self.routing_key) def publish_message(self): if self._stopping: return if self.message == None: return try: properties = pika.BasicProperties(delivery_mode = 1) self.channel.basic_publish(self.exchange, self.routing_key, self.message, properties) self.logger.info('Published message # %i', self._message_number) except Exception as err: import trace self.logger.info("Error in sending message ... {0}".format(err.message)) self.ready = False def start_publishing(self): self.logger.info('Issuing consumer related RPC commands') self.ready = True self.publish_message() def on_bindok(self, unused_frame): self.logger.info('Queue bound') self.start_publishing() def closechannel(self): self.logger.info('Closing the channel') if self.channel: self.channel.close() def openchannel(self): self.logger.info('Creating a new channel') self.connection.channel(on_open_callback=self.onchannel_open) def run(self): self.connection = self.connect() self.connection.ioloop.start() def stop(self): self.logger.info('Stopping') self._stopping = True self.closechannel() self.close_connection() self.connection.ioloop.start() self.logger.info('Stopped') if __name__ == '__main__': try: RABBITMQ_SETTINGS = {"user":"user","passwd":"pw","host":"xxx.xxx.xxx.xxx"} LOG4PY_SETTINGS = {"log4py_file":"./log4py.log","log4py_log_level":"INFO"} publisher = Publisher(RABBITMQ_SETTINGS,LOG4PY_SETTINGS) publisher.start() for i in range(1 , 100000): message = '{"message":"Hello Fellasss...."'+str(i)+'}' publisher.set_message(message) while not publisher.is_ready(): time.sleep(.001) publisher.publish_message() publisher.stop() except KeyboardInterrupt: publisher.stop()