...
The purpose of this document is to provide quick examples of each means of incorporating tools so as to bootstrap ones ability to include their code within one of the two services.
...
Medici extractors typically serve to automatically extract some new kind of information from a file's content when it is uploaded into Medici. These extractors do this by connecting to a shared RabbitMQ bus. When a new file is uploaded to Medici it is announced on this bus. Extractors that can handle a file of the type posted on the bus are triggered and the data they in turn create is returned to Medici as derived data to be associated with that file. The extractors themselves can be implemented in a variety of languages.
Java
An extractor must establish a connection with the Medici RabbitMQ bus, handle incoming messages, start jobs based on received messages, and ultimatley carry out a job on a given file. The example below simply counts the number of words in a document and returns this information as a piece of metadata to be associated with the file.
Code Block |
---|
theme | Emacs |
---|
language | java |
---|
title | Connecting to RabbitMQ |
---|
|
protected void startExtractor(String rabbitMQUsername,
String rabbitMQpassword)
{
try{
//Open channel and declare exchange and consumer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(serverAddr);
factory.setUsername(rabbitMQUsername);
factory.setPassword(rabbitMQpassword);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME,DURABLE,EXCLUSIVE,AUTO_DELETE,null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.file.text.plain.#");
this.channel = channel;
// create listener
channel.basicConsume(QUEUE_NAME, false, CONSUMER_TAG, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
messageReceived = new String(body);
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
System.out.println(" {x} Received '" + messageReceived + "'");
replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
replyTo = properties.getReplyTo();
processMessageReceived();
System.out.println(" [x] Done");
channel.basicAck(deliveryTag, false);
}
});
// start listening
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
while (true) {
Thread.sleep(1000);
}
}
catch(Exception e){
e.printStackTrace();
System.exit(1);
}
} |
Code Block |
---|
language | java |
---|
title | Processing Messages Received From RabbitMQ |
---|
|
protected void processMessageReceived()
{
try {
try {
ExampleJavaExtractorService extrServ = new ExampleJavaExtractorService(this);
jobReceived = getRepresentation(messageReceived, ExtractionJob.class);
File textFile = extrServ.processJob(jobReceived);
jobReceived.setFlag("wasText");
log.info("Word count extraction complete. Returning word count file as intermediate result.");
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Word count extraction complete. Returning word count file as intermediate result.", log);
uploadIntermediate(textFile, "text/plain", log);
textFile.delete();
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
} catch (Exception ioe) {
log.error("Could not finish extraction job.", ioe);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Could not finish extraction job.", log);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
}
} catch(Exception e) {
e.printStackTrace();
System.exit(1);
}
} |
Code Block |
---|
language | java |
---|
title | Processing the JobJobs |
---|
|
public File processJob(ExtractionJob receivedMsg) throws Exception{
{
log.info("Downloading text file with ID "+ receivedMsg.getIntermediateId() +" from " + receivedMsg.getHost());
callingExtractor.sendStatus(receivedMsg.getId(), callingExtractor.getClass().getSimpleName(), "Downloading text file.", log);
DefaultHttpClient httpclient = new DefaultHttpClient();
HttpGet httpGet = new HttpGet(receivedMsg.getHost() +"api/files/"+ receivedMsg.getIntermediateId()+"?key="+playserverKey);
HttpResponse fileResponse = httpclient.execute(httpGet);
log.info(fileResponse.getStatusLine());
if if(fileResponse.getStatusLine().toString().indexOf("200") == -1){
throw new IOException("File not found.");
}
HttpEntity HttpEntity fileEntity = fileResponse.getEntity();
InputStream fileIs = fileEntity.getContent();
Header[] hdrs = fileResponse.getHeaders("content-disposition");
String contentDisp = hdrs[0].toString();
String fileName = contentDisp.substring(contentDisp.indexOf("filename=")+9);
File tempFile = File.createTempFile(fileName.substring(0, fileName.lastIndexOf(".")), fileName.substring(fileName.lastIndexOf(".")).toLowerCase());
OutputStream fileOs = new FileOutputStream(tempFile);
IOUtils.copy(fileIs,fileOs);
fileIs.close();
fileOs.close();
EntityUtils.consume(fileEntity);
log.info("Download complete. Initiating word count generation");
File textFile = processFile(tempFile, receivedMsg.getId());
return textFile;
} |
Code Block |
---|
language | java |
---|
title | Processing the fileFiles |
---|
|
private File processFile(File tempFile, String originalFileId) throws Exception {
{
Runtime r = Runtime.getRuntime();
Process p; // Process tracks one external native process
String tempDir = System.getProperty("java.io.tmpdir");
if (new Character(tempDir.charAt(tempDir.length()-1)).toString().equals(System.getProperty("file.separator")) == false){
tempDir = tempDir + System.getProperty("file.separator");
}
String processCmd = "";
String operSystem = System.getProperty("os.name").toLowerCase();
// TODO: windows impl
if(operSystem.indexOf("nix") >= 0 || operSystem.indexOf("nux") >= 0 || operSystem.indexOf("aix") > 0 ){
"wc -w " + tempDir + tempFile.getName();
}
p = r.exec(processCmd, null, new File(tempDir));
StreamGobbler outputGobbler = new StreamGobbler(p.getInputStream(), "INFO", log);
StreamGobbler errorGobbler = new StreamGobbler(p.getErrorStream(),"ERROR", log);
outputGobbler.start();
errorGobbler.start();
p.waitFor();
File outFile = new File(tempDir + tempFile.getName().substring(0, tempFile.getName().lastIndexOf(".")) + ".txt");
tempFile.delete();
if(!Files.exists(outFile.toPath()))
throw new Exception("File not processed correctly. File is possibly corrupt.");
return outFile;
} |
C++
Code Block |
---|
language | cpp |
---|
title | Connecting to RabbitMQ |
---|
|
#include <amqpcpp.h>
namespace CPPExample
{
class RabbitMQConnectionHandler : public AMQP::ConnectionHandler {
/**
* Method that is called by the AMQP library every time it has data
* available that should be sent to RabbitMQ.
* @param connection pointer to the main connection object
* @param data memory buffer with the data that should be sent to RabbitMQ
* @param size size of the buffer
*/
virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
{
// @todo
// Add your own implementation, for example by doing a call to the
// send() system call. But be aware that the send() call may not
// send all data at once, so you also need to take care of buffering
// the bytes that could not immediately be sent, and try to send
// them again when the socket becomes writable again
}
/**
* Method that is called by the AMQP library when the login attempt
* succeeded. After this method has been called, the connection is ready
* to use.
* @param connection The connection that can now be used
*/
virtual void onConnected(Connection *connection)
{
// @todo
// add your own implementation, for example by creating a channel
// instance, and start publishing or consuming
}
/**
* Method that is called by the AMQP library when a fatal error occurs
* on the connection, for example because data received from RabbitMQ
* could not be recognized.
* @param connection The connection on which the error occured
* @param message A human readable error message
*/
virtual void onError(Connection *connection, const std::string &message)
{
// @todo
// add your own implementation, for example by reporting the error
// to the user of your program, log the error, and destruct the
// connection object because it is no longer in a usable state
}
};
} |
Code Block |
---|
|
namespace CPPExample
{
/**
* Parse data that was recevied from RabbitMQ
*
* Every time that data comes in from RabbitMQ, you should call this method to parse
* the incoming data, and let it handle by the AMQP-CPP library. This method returns the number
* of bytes that were processed.
*
* If not all bytes could be processed because it only contained a partial frame, you should
* call this same method later on when more data is available. The AMQP-CPP library does not do
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
* later call.
*
* @param buffer buffer to decode
* @param size size of the buffer to decode
* @return number of bytes that were processed
*/
size_t parse(char *buffer, size_t size)
{
return _implementation.parse(buffer, size);
}
} |
...
Code Block |
---|
theme | Emacs |
---|
language | py |
---|
title | Connecting to RabbitMQ |
---|
|
# connect to rabbitmq using input username and password
credentials = pika.PlainCredentials(sys.argv[1], sys.argv[2])
parameters = pika.ConnectionParameters(credentials=credentials)
connection = pika.BlockingConnection(parameters)
# connect to channel
channel = connection.channel()
# declare the exchange
channel.exchange_declare(exchange='medici', exchange_type='topic', durable=True)
# declare the queue
channel.queue_declare(queue=receiver, durable=True)
# connect queue and exchange
channel.queue_bind(queue=receiver, exchange='medici', routing_key='*.file.text.plain')
# create listener
channel.basic_consume(on_message, queue=receiver, no_ack=False)
# start listening
logger.info("Waiting for messages. To exit press CTRL+C")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# close connection
connection.close() |
Code Block |
---|
language | py |
---|
title | On MessageProcessing Messages |
---|
|
def on_message(channel, method, header, body):
global logger
statusreport = {}
inputfile=None
try:
# parse body back from json
jbody=json.loads(body)
host=jbody['host']
fileid=jbody['id']
intermediatefileid=jbody['intermediateId']
if not (host.endswith('/')):
host += '/'
# for status reports
statusreport['file_id'] = fileid
statusreport['extractor_id'] = 'wordCount'
# print what we are doing
logger.debug("[%s] started processing", fileid)
# fetch data
statusreport['status'] = 'Downloading file.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
url=host + 'api/files/' + intermediatefileid + '?key=' + playserverKey
r=requests.get(url, stream=True)
r.raise_for_status()
(fd, inputfile)=tempfile.mkstemp()
with os.fdopen(fd, "w") as f:
for chunk in r.iter_content(chunk_size=10*1024):
f.write(chunk)
# create word count
statusreport['status'] = 'Creating word count.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
create_word_count(inputfile, ext, host, fileid)
# Ack
channel.basic_ack(method.delivery_tag)
logger.debug("[%s] finished processing", fileid)
except subprocess.CalledProcessError as e:
logger.exception("[%s] error processing [exit code=%d]\n%s", fileid, e.returncode, e.output)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
except:
logger.exception("[%s] error processing", fileid)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
finally:
statusreport['status'] = 'DONE.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
if inputfile is not None:
try:
os.remove(inputfile)
except OSError:
pass
except UnboundLocalError:
pass |
Code Block |
---|
language | py |
---|
title | Create Example Job: Word Count |
---|
|
def create_word_count(inputfile, ext, host, fileid):
global logger
(fd, inputfile)=tempfile.mkstemp(suffix='.' + ext)
try:
# make syscall to wc
subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT)
if(os.path.getsize(wcfile) == 0):
raise Exception("File is empty.")
# upload word count file |
Calling R Scripts from Python
Coming soon...
Versus Extractors
...
Code Block |
---|
language | java |
---|
title | Extractor |
---|
|
|
Code Block |
---|
|
public class WordCountMeasure implements Serializable, Measure
{
private static final long SLEEP = 10000;
@Override
public Similarity compare(Descriptor feature1, Descriptor feature2)
throws Exception {
Thread.sleep(SLEEP);
return new SimilarityNumber(0);
}
@Override
public SimilarityPercentage normalize(Similarity similarity) {
return null;
}
@Override
public String getFeatureType() {
return WordCountMeasure.class.getName();
}
@Override
public String getName() {
return "Word Count Measure";
}
@Override
public Class<WordCountMeasure> getType() {
return WordCountMeasure.class;
}
} |