def connect_message_bus():
"""Connect to message bus and wait for messages"""
global extractorName, username, password, messageType, exchange
# connect to rabbitmq using input username and password
if (username is None or password is None):
connection = pika.BlockingConnection()
else:
credentials = pika.PlainCredentials(sys.argv[1]username, sys.argv[2]password)
parameters = pika.ConnectionParameters(host=rabbitmqhost, credentials=credentials)
connection = pika.BlockingConnection(parameters)
# connect to channel
channel = connection.channel()
# declare the exchange in case it does not exist
channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
# declare the queue in case it does not exist
channel.queue_declare(queue=extractorName, durable=True)
# connect queue and exchange
channel.queue_bind(queue=extractorName, exchange=exchange, routing_key=messageType)
# create listener
channel.basic_consume(on_message, queue=extractorName, no_ack=False)
# start listening
logger.info("Waiting for messages. To exit press CTRL+C")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# close connection
connection.close()
|