Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
titleconfiguration
# name where rabbitmq is running
rabbitmqhost = "localhost"
 
# name to show in rabbitmq queue list
exchange = "medici"

# name to show in rabbitmq queue list
extractorName = "wordCount"

# username and password to connect to rabbitmq
username = None
password = None

# accept any type of file that is text
messageType = "*.file.text.#"

# secret key used to connect to medici, this will eventually be
# part of the message received.
secretKey = "r1ek3rs"

...

Code Block
languagepy
titleconnect_message_bus
 def connect_message_bus():
    """Connect to message bus and wait for messages"""
    global extractorName, username, password, messageType, exchange
    # connect to rabbitmq using input username and password
    if (username is None or password is None):
        connection = pika.BlockingConnection()
    else:
        credentials = pika.PlainCredentials(sys.argv[1]username, sys.argv[2]password)
        parameters = pika.ConnectionParameters(host=rabbitmqhost, credentials=credentials)
        connection = pika.BlockingConnection(parameters)
    
    # connect to channel
    channel = connection.channel()
    
    # declare the exchange in case it does not exist
    channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
    
    # declare the queue in case it does not exist
    channel.queue_declare(queue=extractorName, durable=True)
    # connect queue and exchange
    channel.queue_bind(queue=extractorName, exchange=exchange, routing_key=messageType)
    # create listener
    channel.basic_consume(on_message, queue=extractorName, no_ack=False)
    # start listening
    logger.info("Waiting for messages. To exit press CTRL+C")
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    # close connection
    connection.close()

...