This page will describe how to create an extractor using python. A lot of this code can be copied and pasted. The only major part is the process_file function. You can find this example also in our code repository at: https://opensource.ncsa.illinois.edu/stash/projects/MMDB/repos/extractors-examples/browse/python
The first step in the example is to setup some global variables that will be used in the rest of the code.
Code Block | ||||
---|---|---|---|---|
| ||||
# name where rabbitmq is running
rabbitmqhost = "localhost"
# name to show in rabbitmq queue list
exchange = "medici"
# name to show in rabbitmq queue list
extractorName = "wordCount"
# username and password to connect to rabbitmq
username = None
password = None
# accept any type of file that is text
messageType = "*.file.text.#"
# secret key used to connect to medici, this will eventually be
# part of the message received.
secretKey = "r1ek3rs" |
...
Code Block | ||||
---|---|---|---|---|
| ||||
def connect_message_bus(): """Connect to message bus and wait for messages""" global extractorName, username, password, messageType, exchange # connect to rabbitmq using input username and password if (username is None or password is None): connection = pika.BlockingConnection() else: credentials = pika.PlainCredentials(sys.argv[1]username, sys.argv[2]password) parameters = pika.ConnectionParameters(host=rabbitmqhost, credentials=credentials) connection = pika.BlockingConnection(parameters) # connect to channel channel = connection.channel() # declare the exchange in case it does not exist channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True) # declare the queue in case it does not exist channel.queue_declare(queue=extractorName, durable=True) # connect queue and exchange channel.queue_bind(queue=extractorName, exchange=exchange, routing_key=messageType) # create listener channel.basic_consume(on_message, queue=extractorName, no_ack=False) # start listening logger.info("Waiting for messages. To exit press CTRL+C") try: channel.start_consuming() except KeyboardInterrupt: channel.stop_consuming() # close connection connection.close() |
...
Code Block | ||||
---|---|---|---|---|
| ||||
def on_message(channel, method, header, body): """When message is received do the following steps: 1. download the file 2. launch extractor function""" global logger, extractorName inputfile=None fileid=0 try: # parse body back from json jbody=json.loads(body) host=jbody['host'] fileid=jbody['id'] intermediatefileid=jbody['intermediateId'] if not (host.endswith('/')): host += '/' # printtell whateverybody we are doingstarting to process the file logger.debug("[%s] started processing", fileid)status_update(channel, header, fileid, "Started processing file") # download file inputfile = download_file(channel, header, host, secretKey, fileid, intermediatefileid) # call actual extractor function process_file(channel, header, host, secretKey, fileid, intermediatefileid, inputfile) # notify rabbitMQ we are done processsing message channel.basic_ack(method.delivery_tag) except subprocess.CalledProcessError as e: msg = str.format("Error processing [exit code=%d]\n%s", e.returncode, e.output) logger.exception("[%s] %s", fileid, msg) status_update(channel, header, fileid, msg) except: logger.exception("[%s] error processing", fileid) status_update(channel, header, fileid, "Error processing") finally: status_update(channel, header, fileid, "Done") if inputfile is not None: try: os.remove(inputfile) except OSError: pass except UnboundLocalError: pass |
...
Code Block | ||||
---|---|---|---|---|
| ||||
def process_file(channel, header, host, key, fileid, intermediatefileid, inputfile): """Count the number of words in text file""" status_update(channel, header, fileid, "Counting words in file.") # call actual program result = subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT) (lines, words, characters, filename) = result.split() # store results as metadata metadata={} metadata['lines']=lines metadata['words']=words metadata['characters']=characters headers={'Content-Type': 'application/json'} r = requests.post('%sapi/files/%s/metadata?key=%s' % (host, fileid, key), headers=headers, data=json.dumps(metadata)); r.raise_for_status() |
...
The following code will process an incoming video and upload a preview of the video
Code Block | ||||
---|---|---|---|---|
| ||||
def create_image_thumbnail(inputfile, ext, size, host, fileid, *args):
global logger
(fd, thumbnailfile)=tempfile.mkstemp(suffix='.' + ext)
try:
# convert image to right size
subprocess.check_output(['convert', inputfile, '-resize', size, thumbnailfile], stderr=subprocess.STDOUT)
if(os.path.getsize(thumbnailfile) == 0):
raise Exception("File is empty.")
# upload preview image
url=host + 'api/fileThumbnail?key=' + playserverKey
r = requests.post(url, files={"File" : open(thumbnailfile, 'rb')})
r.raise_for_status()
thumbnailid = r.json()['id']
# associate uploaded thumbnail with original file
url=host + 'api/files/' + fileid + '/thumbnails/' + thumbnailid + '?key=' + playserverKey
r = requests.post(url);
r.raise_for_status()
logger.debug("[%s] created thumbnail of type %s", fileid, ext)
finally:
try:
os.remove(thumbnailfile)
except:
pass |