Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Medici extractors typically serve to automatically extract some new kind of information from a file's content when its it is uploaded into Medici.  These extractors do this by connecting to a shared RabbitMQ bus.  When a new file is uploaded to Medici it is announced on this bus.  Extractors that can handle a file of the type posted on the bus are triggered and the data they in turn create is returned to Medici as derived data to be associated with that file.  The extractors themselves can be implemented in a variety of languages.

...

Code Block
themeEmacs
languagejava
titleConnecting to RabbitMQ
protected void startExtractor(String rabbitMQUsername,
	String rabbitMQpassword) {
	try{ 
 		//Open channel and declare exchange and consumer
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(serverAddr);
		factory.setUsername(rabbitMQUsername);
		factory.setPassword(rabbitMQpassword);
		Connection connection = factory.newConnection();

 		final Channel channel = connection.createChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

		channel.queueDeclare(QUEUE_NAME,DURABLE,EXCLUSIVE,AUTO_DELETE,null);
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.file.text.plain.#");
 
 		this.channel = channel;

 		// create listener
		channel.basicConsume(QUEUE_NAME, false, CONSUMER_TAG, new DefaultConsumer(channel) {
 			@Override
 			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				messageReceived = new String(body);
 				long deliveryTag = envelope.getDeliveryTag();
 				// (process the message components here ...)
				System.out.println(" {x} Received '" + messageReceived + "'");
 
				replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
				replyTo = properties.getReplyTo();
 
				processMessageReceived();
				System.out.println(" [x] Done");
				channel.basicAck(deliveryTag, false);
			}
		});

 		// start listening 
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 		while (true) {
			Thread.sleep(1000);
		}
	}
 	catch(Exception e){
		e.printStackTrace();
		System.exit(1);
	} 
}

 

Anchor
C++
C++
C++

Code Block
languagecpp
titleConnecting to RabbitMQ
#include <amqpcpp.h>

namespace CPPExample {

  class RabbitMQConnectionHandler : public AMQP::ConnectionHandler {
      /**
      *  Method that is called by the AMQP library every time it has data
      *  available that should be sent to RabbitMQ. 
      *  @param  connection  pointer to the main connection object  
      *  @param  data        memory buffer with the data that should be sent to RabbitMQ
      *  @param  size        size of the buffer
      */
     virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
     {
         // @todo 
         //  Add your own implementation, for example by doing a call to the
         //  send() system call. But be aware that the send() call may not
         //  send all data at once, so you also need to take care of buffering
         //  the bytes that could not immediately be sent, and try to send 
         //  them again when the socket becomes writable again
     }

      /**
      *  Method that is called by the AMQP library when the login attempt 
      *  succeeded. After this method has been called, the connection is ready 
      *  to use.
      *  @param  connection      The connection that can now be used
      */
      virtual void onConnected(Connection *connection)
      {
         // @todo
         //  add your own implementation, for example by creating a channel 
         //  instance, and start publishing or consuming
      }

      /**
      *  Method that is called by the AMQP library when a fatal error occurs
      *  on the connection, for example because data received from RabbitMQ
      *  could not be recognized.
      *  @param  connection      The connection on which the error occured
      *  @param  message         A human readable error message
      */
      virtual void onError(Connection *connection, const std::string &message)
      {
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program, log the error, and destruct the 
        //  connection object because it is no longer in a usable state
      }
  };

}
Code Block
languagecpp
titleReceiver
namespace CPPExample {

  /**
   *  Parse data that was recevied from RabbitMQ
   *  
   *  Every time that data comes in from RabbitMQ, you should call this method to parse
   *  the incoming data, and let it handle by the AMQP-CPP library. This method returns the number
   *  of bytes that were processed.
   *
   *  If not all bytes could be processed because it only contained a partial frame, you should
   *  call this same method later on when more data is available. The AMQP-CPP library does not do
   *  any buffering, so it is up to the caller to ensure that the old data is also passed in that
   *  later call.
   *
   *  @param  buffer      buffer to decode
   *  @param  size        size of the buffer to decode
   *  @return             number of bytes that were processed
   */
  size_t parse(char *buffer, size_t size)
  {
     return _implementation.parse(buffer, size);
  }
}

...

Anchor
Python
Python
Python

 

Code Block
themeEmacs
languagepy
titleInstantiating the logger and starting the extractor
def main():
 global logger

 # name of receiver
receiver='ExamplePythonExtractor'

 # configure the logging system
logging.basicConfig(format="%(asctime)-15s %(name)-10s %(levelname)-7s : %(message)s", level=logging.WARN)
logger = logging.getLogger(receiver)
logger.setLevel(logging.DEBUG)
 
 if len(sys.argv) != 4:
logger.info("Input RabbitMQ username, followed by RabbitMQ password and Medici REST API key.")
sys.exit()
 
 global playserverKey
playserverKey = sys.argv[3]
Code Block
themeEmacs
languagepy
titleConnecting to RabbitMQ
# connect to rabbitmq using input username and password 
credentials = pika.PlainCredentials(sys.argv[1], sys.argv[2])
parameters = pika.ConnectionParameters(credentials=credentials)
connection = pika.BlockingConnection(parameters)
 
 # connect to channel
channel = connection.channel()

 # declare the exchange
channel.exchange_declare(exchange='medici', exchange_type='topic', durable=True)

 # declare the queue
channel.queue_declare(queue=receiver, durable=True)

 # connect queue and exchange
channel.queue_bind(queue=receiver, exchange='medici', routing_key='*.file.text.plain')

 # create listener
channel.basic_consume(on_message, queue=receiver, no_ack=False)

 # start listening
logger.info("Waiting for messages. To exit press CTRL+C")
 try:
channel.start_consuming()
 except KeyboardInterrupt:
channel.stop_consuming()

Anchor
Versus
Versus
Versus Extractors

Versus extractors serve to extract a signature from a file's content.  These signatures, effectively a hash for the data, are typically numerical vectors which capture some semantically meaningful aspect of the content so that two such signatures can then be compared using some distance measure.  Within Versus extractors operate on a data structure representing the content of a file, produced a Versus adapter, and the returned signatures compared by either a Versus similarity or distance measure.  The combination of these adapters, extractors, and measures in turn compose a comparison which can be used for relating files according their contents.

Anchor
Java Measure
Java Measure
Java

...