This page will describe how to create an extractor using python. A lot of this code can be copied and pasted. The only major part is the process_file function. You can find this example also in our code repository at:
The first step in the example is to setup some global variables that will be used in the rest of the code.
# name to show in rabbitmq queue list exchange = "medici" # name to show in rabbitmq queue list extractorName = "wordCount" # username and password to connect to rabbitmq username = None password = None # accept any type of file that is text messageType = "*.file.text.#" # secret key used to connect to medici, this will eventually be # part of the message received. secretKey = "r1ek3rs"
A convenience function to return status messages to Medici
def status_update(channel, header, fileid, status): """Send notification on message bus with update""" global extractorName, exchange logger.debug("[%s] : %s", fileid, status) statusreport = {} statusreport['file_id'] = fileid statusreport['extractor_id'] = extractorName statusreport['status'] = status statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') channel.basic_publish(exchange=exchange, routing_key=header.reply_to, properties=pika.BasicProperties(correlation_id = header.correlation_id), body=json.dumps(statusreport))
Next we define a function that will listen for messages on the message bus
def connect_message_bus(): """Connect to message bus and wait for messages""" global extractorName, username, password, messageType, exchange # connect to rabbitmq using input username and password if (username is None or password is None): connection = pika.BlockingConnection() else: credentials = pika.PlainCredentials(sys.argv[1], sys.argv[2]) parameters = pika.ConnectionParameters(credentials=credentials) connection = pika.BlockingConnection(parameters) # connect to channel channel = connection.channel() # declare the exchange in case it does not exist channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True) # declare the queue in case it does not exist channel.queue_declare(queue=extractorName, durable=True) # connect queue and exchange channel.queue_bind(queue=extractorName, exchange=exchange, routing_key=messageType) # create listener channel.basic_consume(on_message, queue=extractorName, no_ack=False) # start listening logger.info("Waiting for messages. To exit press CTRL+C") try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() # close connection connection.close()
Next the function to handle a message. This will call a function to download the data, and process the data. Only if download and process_file are handled without any errors, this function will ack the message on the message bus, telling rabbitMQ the file has been processed, and nobody else should process the file again. If this ack did not happen (because the process died in the middle), the next extractor listening on this channel will pick up the message and process the file.
def on_message(channel, method, header, body): """When message is received do the following steps: 1. download the file 2. launch extractor function""" global logger, extractorName inputfile=None fileid=0 try: # parse body back from json jbody=json.loads(body) host=jbody['host'] fileid=jbody['id'] intermediatefileid=jbody['intermediateId'] if not (host.endswith('/')): host += '/' # print what we are doing logger.debug("[%s] started processing", fileid) # download file inputfile = download_file(channel, header, host, secretKey, fileid, intermediatefileid) # call actual extractor function process_file(channel, header, host, secretKey, fileid, intermediatefileid, inputfile) # notify rabbitMQ we are done processsing message channel.basic_ack(method.delivery_tag) except subprocess.CalledProcessError as e: msg = str.format("Error processing [exit code=%d]\n%s", e.returncode, e.output) logger.exception("[%s] %s", fileid, msg) status_update(channel, header, fileid, msg) except: logger.exception("[%s] error processing", fileid) status_update(channel, header, fileid, "Error processing") finally: status_update(channel, header, fileid, "Done") if inputfile is not None: try: os.remove(inputfile) except OSError: pass except UnboundLocalError: pass
This function will download the actual file from medici so it can be processed by the local extractor. The on_message function will take care of removing the file after the processing of data is complete.
def download_file(channel, header, host, key, fileid, intermediatefileid): """Download file to be processed from Medici""" status_update(channel, header, fileid, "Downloading file.") # fetch data url=host + 'api/files/' + intermediatefileid + '?key=' + key r=requests.get('%sapi/files/%s?key=%s' % (host, intermediatefileid, key), stream=True) r.raise_for_status() (fd, inputfile)=tempfile.mkstemp() with os.fdopen(fd, "w") as f: for chunk in r.iter_content(chunk_size=10*1024): f.write(chunk) return inputfile
Finally this is the main block of code. This will process the data, and send any extracted metadata, or previews back to medici. The following example will send back extracted metadata back to medici.
def process_file(channel, header, host, key, fileid, intermediatefileid, inputfile): """Count the number of words in text file""" status_update(channel, header, fileid, "Counting words in file.") # call actual program result = subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT) (lines, words, characters, filename) = result.split() # store results as metadata metadata={} metadata['lines']=lines metadata['words']=words metadata['characters']=characters headers={'Content-Type': 'application/json'} r = requests.post('%sapi/files/%s/metadata?key=%s' % (host, fileid, key), headers=headers, data=json.dumps(metadata)); r.raise_for_status()
For