This page will describe how to create an extractor using python. A lot of this code can be copied and pasted. The only major part is the process_file function. You can find this example also in our code repository at: https://opensource.ncsa.illinois.edu/stash/projects/MMDB/repos/extractors-examples/browse/python
The first step in the example is to setup some global variables that will be used in the rest of the code.
Code Block | ||||
---|---|---|---|---|
| ||||
# name where rabbitmq is running rabbitmqhost = "localhost" # name to show in rabbitmq queue list exchange = "medici" # name to show in rabbitmq queue list extractorName = "wordCount" # username and password to connect to rabbitmq username = None password = None # accept any type of file that is text messageType = "*.file.text.#" # secret key used to connect to medici, this will eventually be # part of the message received. secretKey = "r1ek3rs" |
...
Code Block | ||||
---|---|---|---|---|
| ||||
def connect_message_bus(): """Connect to message bus and wait for messages""" global extractorName, username, password, messageType, exchange # connect to rabbitmq using input username and password if (username is None or password is None): connection = pika.BlockingConnection() else: credentials = pika.PlainCredentials(sys.argv[1]username, sys.argv[2]password) parameters = pika.ConnectionParameters(host=rabbitmqhost, credentials=credentials) connection = pika.BlockingConnection(parameters) # connect to channel channel = connection.channel() # declare the exchange in case it does not exist channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True) # declare the queue in case it does not exist channel.queue_declare(queue=extractorName, durable=True) # connect queue and exchange channel.queue_bind(queue=extractorName, exchange=exchange, routing_key=messageType) # create listener channel.basic_consume(on_message, queue=extractorName, no_ack=False) # start listening logger.info("Waiting for messages. To exit press CTRL+C") try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() # close connection connection.close() |
...