Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page will describe how to create an extractor using python. A lot of this code can be copied and pasted. The only major part is the process_file function. You can find this example also in our code repository at:  https://opensource.ncsa.illinois.edu/stash/projects/MMDB/repos/extractors-examples/browse/python

The first step in the example is to setup some global variables that will be used in the rest of the code.

Code Block
languagepy
titleconfiguration
# name where rabbitmq is running
rabbitmqhost = "localhost"
 
# name to show in rabbitmq queue list
exchange = "medici"

# name to show in rabbitmq queue list
extractorName = "wordCount"

# username and password to connect to rabbitmq
username = None
password = None

# accept any type of file that is text
messageType = "*.file.text.#"

# secret key used to connect to medici, this will eventually be
# part of the message received.
secretKey = "r1ek3rs"

...

Code Block
languagepy
titleconnect_message_bus
 def connect_message_bus():
    """Connect to message bus and wait for messages"""
    global extractorName, username, password, messageType, exchange
    # connect to rabbitmq using input username and password
    if (username is None or password is None):
        connection = pika.BlockingConnection()
    else:
        credentials = pika.PlainCredentials(sys.argv[1]username, sys.argv[2]password)
        parameters = pika.ConnectionParameters(host=rabbitmqhost, credentials=credentials)
        connection = pika.BlockingConnection(parameters)
    
    # connect to channel
    channel = connection.channel()
    
    # declare the exchange in case it does not exist
    channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)
    
    # declare the queue in case it does not exist
    channel.queue_declare(queue=extractorName, durable=True)
    # connect queue and exchange
    channel.queue_bind(queue=extractorName, exchange=exchange, routing_key=messageType)
    # create listener
    channel.basic_consume(on_message, queue=extractorName, no_ack=False)
    # start listening
    logger.info("Waiting for messages. To exit press CTRL+C")
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    # close connection
    connection.close()

...

Code Block
languagepy
titleon_message
def on_message(channel, method, header, body):
    """When message is received do the following steps:
    1. download the file
    2. launch extractor function"""

    global logger, extractorName
    inputfile=None
    fileid=0

    try:
        # parse body back from json
        jbody=json.loads(body)
        host=jbody['host']
        fileid=jbody['id']
        intermediatefileid=jbody['intermediateId']
        if not (host.endswith('/')):
            host += '/'
        
         # printtell whateverybody we are doingstarting to process the file
        logger.debug("[%s] started processing", fileid)status_update(channel, header, fileid, "Started processing file")

        # download file
        inputfile = download_file(channel, header, host, secretKey, fileid, intermediatefileid)
        # call actual extractor function
        process_file(channel, header, host, secretKey, fileid, intermediatefileid, inputfile)
 
        # notify rabbitMQ we are done processsing message
        channel.basic_ack(method.delivery_tag)
    except subprocess.CalledProcessError as e:
        msg = str.format("Error processing [exit code=%d]\n%s", e.returncode, e.output)
        logger.exception("[%s] %s", fileid, msg)
        status_update(channel, header, fileid, msg)
    except:
        logger.exception("[%s] error processing", fileid)
        status_update(channel, header, fileid, "Error processing")
    finally:
        status_update(channel, header, fileid, "Done")
        if inputfile is not None:
            try:
                os.remove(inputfile)
            except OSError:
                pass
            except UnboundLocalError:
                pass

...

Code Block
languagepy
titleprocess_file
def process_file(channel, header, host, key, fileid, intermediatefileid, inputfile):
    """Count the number of words in text file"""
    status_update(channel, header, fileid, "Counting words in file.")
    # call actual program
    result = subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT)
    (lines, words, characters, filename) = result.split()

    # store results as metadata
    metadata={}
    metadata['lines']=lines
    metadata['words']=words
    metadata['characters']=characters
    headers={'Content-Type': 'application/json'}
    r = requests.post('%sapi/files/%s/metadata?key=%s' % (host, fileid, key),
                      headers=headers,
                      data=json.dumps(metadata));
    r.raise_for_status()

...

The following code will process an incoming video and upload a preview of the video

Code Block
languagepy
titlecreate_image_thumbnail
def create_image_thumbnail(inputfile, ext, size, host, fileid, *args):
	global logger

	(fd, thumbnailfile)=tempfile.mkstemp(suffix='.' + ext)
	try:
		# convert image to right size
		subprocess.check_output(['convert',  inputfile, '-resize', size, thumbnailfile], stderr=subprocess.STDOUT)

		if(os.path.getsize(thumbnailfile) == 0):
			raise Exception("File is empty.")

		# upload preview image
		url=host + 'api/fileThumbnail?key=' + playserverKey
		r = requests.post(url, files={"File" : open(thumbnailfile, 'rb')})
		r.raise_for_status()
		thumbnailid = r.json()['id']

		# associate uploaded thumbnail with original file
		url=host + 'api/files/' + fileid + '/thumbnails/' + thumbnailid + '?key=' + playserverKey
		r = requests.post(url);
		r.raise_for_status()

		logger.debug("[%s] created thumbnail of type %s", fileid, ext)
	finally:
		try:
			os.remove(thumbnailfile)
		except:
			pass