...
Code Block |
---|
|
<?xml version="1.0" encoding="UTF-8"?>
<!--
Load image data from a PGM file and represent the data as a sequence of pixels in row major order.
-->
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:dfdl="http://www.ogf.org/dfdl/dfdl-1.0/" xmlns:ex="http://example.com" targetNamespace="http://example.com">
<xs:include schemaLocation="xsd/built-in-formats.xsd"/>
<xs:annotation>
<xs:appinfo source="http://www.ogf.org/dfdl/">
<dfdl:format ref="ex:daffodilTest1" separator="" initiator="" terminator="" leadingSkip='0' textTrimKind="none" initiatedContent="no"
alignment="implicit" alignmentUnits="bits" trailingSkip="0" ignoreCase="no" separatorPolicy="suppressed"
separatorPosition="infix" occursCountKind="parsed" emptyValueDelimiterPolicy="both" representation="text"
textNumberRep="standard" lengthKind="delimited" encoding="ASCII"/>
</xs:appinfo>
</xs:annotation>
<xs:element name="file">
<xs:complexType>
<xs:sequence>
<xs:element name="header" dfdl:lengthKind="implicit" maxOccurs="1">
<xs:complexType>
<xs:sequence dfdl:sequenceKind="ordered" dfdl:separator="%NL;" dfdl:separatorPosition="postfix">
<xs:element name="type" type="xs:string"/>
<xs:element name="dimensions" maxOccurs="1" dfdl:occursCountKind="implicit">
<xs:complexType>
<xs:sequence dfdl:sequenceKind="ordered" dfdl:separator="%SP;">
<xs:element name="width" type="xs:integer"/>
<xs:element name="height" type="xs:integer"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="depth" type="xs:integer"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="pixels" dfdl:lengthKind="implicit" maxOccurs="1">
<xs:complexType>
<xs:sequence dfdl:separator="%SP; %NL; %SP;%NL;" dfdl:separatorPosition="postfix" dfdl:separatorSuppressionPolicy="anyEmpty">
<xs:element name="pixel" type="xs:integer" maxOccurs="unbounded" dfdl:occursCountKind="expression"
dfdl:occursCount="{../../ex:header/ex:dimensions/ex:width * ../../ex:header/ex:dimensions/ex:height }"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema> |
...
Medici extractors typically serve to automatically extract some new kind of information from a file's content when it is uploaded into Medici. These extractors do this by connecting to a shared RabbitMQ bus. When a new file is uploaded to Medici it is announced on this bus. Extractors that can handle a file of the type posted on the bus are triggered and the data they in turn create is returned to Medici as derived data to be associated with that file. The extractors themselves can be implemented in a variety of languages. Examples of these extractors in different languages can be found in the extractors-templates code repository.
Java
An extractor must establish a connection with the Medici RabbitMQ bus, handle incoming messages, start jobs based on received messages, and ultimatley carry out a job on a given file. The example below simply counts the number of words in a document and returns this information as a piece of metadata to be associated with the file.
Code Block |
---|
theme | Emacs |
---|
language | java |
---|
title | Connecting to RabbitMQ |
---|
|
protected void startExtractor(String rabbitMQUsername, String rabbitMQpassword)
{
try{
//Open channel and declare exchange and consumer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(serverAddr);
factory.setUsername(rabbitMQUsername);
factory.setPassword(rabbitMQpassword);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME,DURABLE,EXCLUSIVE,AUTO_DELETE,null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.file.text.plain.#");
this.channel = channel;
// create listener
channel.basicConsume(QUEUE_NAME, false, CONSUMER_TAG, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
messageReceived = new String(body);
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
System.out.println(" {x} Received '" + messageReceived + "'");
replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
replyTo = properties.getReplyTo();
processMessageReceived();
System.out.println(" [x] Done");
channel.basicAck(deliveryTag, false);
}
});
// start listening
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
while (true) {
Thread.sleep(1000);
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
} |
Java extractors can be created using the amqp-client jar file. This allows you to connect to the RabitMQ bus and received messages. The easiest way to get up and running is to use maven with java to add all required dependencies. An example of an extractor written in java can be found at Medici Extractor in Java.
PythonPython extractors will often be based on the packages pika and requests. This allows you to connect to the RabittMQ message bus and easily send requests to medici. A complete example of the python extractor can be found at Medici Extractor in Python
Calling R Scripts from Python
Coming soon...
Versus Extractors
Versus extractors serve to extract a signature from a file's content. These signatures, effectively a hash for the data, are typically numerical vectors which capture some semantically meaningful aspect of the content so that two such signatures can then be compared using some distance measure. Within Versus extractors operate on a data structure representing the content of a file, produced a Versus adapter, and the returned signatures compared by either a Versus similarity or distance measure. The combination of these adapters, extractors, and measures in turn compose a comparison which can be used for relating files according their contents.
JavaThe main class sets up the comparison, this is done by adding the two files that need to be compared, as well as the adapter to load the file, the extractor to extract a feature from the file, and a measurement to compare the two features.
Code Block |
---|
|
static public void main(String[] args) {
PairwiseComparison comparison = new PairwiseComparison();
comparison.setId(UUID.randomUUID().toString());
comparison.setFirstDataset(new File("data/test1.txt"));
comparison.setSecondDataset(new File("data/test2.txt"));
comparison.setAdapterId(TextAdapter.class.getName());
comparison.setExtractorId(TextHistogramExtractor.class.getName());
comparison.setMeasureId(LabelHistogramEuclidianDistanceMeasure.class.getName());
ExecutionEngine ee = new ExecutionEngine();
ee.submit(comparison, new ComparisonStatusHandler() {
@Override
public void onStarted() |
Code Block |
---|
language | java |
---|
title | Processing Messages |
---|
|
protected void processMessageReceived()
{
try {
try {
ExampleJavaExtractorService extrServ = new ExampleJavaExtractorService(this);
jobReceived = getRepresentation(messageReceived, ExtractionJob.class);System.out.println("STARTED : ");
}
File textFile = extrServ.processJob(jobReceived);
@Override
jobReceived.setFlag("wasText"); public void onFailed(String msg, Throwable e) {
log.info("Word count extraction complete. Returning word count file as intermediate resultSystem.out.println("FAILED : " + msg);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Word count extraction complete. Returning word count file as intermediate result.", log e.printStackTrace();
uploadIntermediate(textFile, "text/plain", log);
textFileSystem.deleteexit(0);
}
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
} catch (Exception ioe @Override
public void onDone(double value) {
log.error System.out.println("CouldDONE not finish extraction: job.", + ioevalue);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Could not finish extraction job.", log System.exit(0);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
}
}
} catch(Exception e) {@Override
e.printStackTrace();
public void System.exit(1);onAborted(String msg) {
}
} |
Code Block |
---|
language | java |
---|
title | Processing Jobs |
---|
|
public File processJob(ExtractionJob receivedMsg) throws Exception
{
log.info("Downloading text file with ID "+ receivedMsg.getIntermediateId() +" from " + receivedMsg.getHost());
callingExtractor.sendStatus(receivedMsg.getId(), callingExtractor.getClass().getSimpleName(), "Downloading text file.", log);
DefaultHttpClient httpclient = new DefaultHttpClient( System.out.println("ABORTED : " + msg);
System.exit(0);
}
});
HttpGet httpGet = new HttpGet(receivedMsg.getHost() +"api/files/"+ receivedMsg.getIntermediateId()+"?key="+playserverKey);
HttpResponse fileResponse = httpclient.execute(httpGet);
log.info(fileResponse.getStatusLine());
if(fileResponse.getStatusLine().toString().indexOf("200") == -1){
throw new IOException("File not found.");
}
HttpEntity fileEntity = fileResponse.getEntity();
InputStream fileIs = fileEntity.getContent();
Header[] hdrs = fileResponse.getHeaders("content-disposition");
String contentDisp = hdrs[0].toString();
String fileName = contentDisp.substring(contentDisp.indexOf("filename=")+9);
File tempFile = File.createTempFile(fileName.substring(0, fileName.lastIndexOf(".")), fileName.substring(fileName.lastIndexOf(".")).toLowerCase());
OutputStream fileOs = new FileOutputStream(tempFile);
IOUtils.copy(fileIs,fileOs);
fileIs.close();
fileOs.close } |
The text adapter will take a text file, and load all the file, splitting the text into words and return a list of all words in the text. The words are still in the right order, and it is possible to read the original information of the file by reading the words in the order as they are returned by getWords().
Code Block |
---|
language | java |
---|
title | Text Adapter |
---|
|
public class TextAdapter implements FileLoader, HasText {
private File file;
private List<String> words;
public TextAdapter() {}
// ----------------------------------------------------------------------
// FileLoader
// ----------------------------------------------------------------------
@Override
public void load(File file) {
this.file = file;
}
@Override
public String getName() {
return "Text Document";
}
@Override
public List<String> getSupportedMediaTypes() {
List<String> mediaTypes = new ArrayList<String>();
EntityUtilsmediaTypes.consume(fileEntityadd("text/*");
log.info("Download complete. Initiating word count generation");
return mediaTypes;
File textFile = processFile(tempFile, receivedMsg.getId());
return textFile;
} |
Code Block |
---|
language | java |
---|
title | Processing Files |
---|
|
private File processFile(File tempFile, String originalFileId) throws Exception
{
Runtime r = Runtime.getRuntime();// ----------------------------------------------------------------------
Process p;// HasText
// Process tracks one external native process
String tempDir = System.getProperty("java.io.tmpdir");
if (new Character(tempDir.charAt(tempDir.length()-1)).toString().equals(System.getProperty("file.separator")) == false){
// ----------------------------------------------------------------------
@Override
public List<String> getWords() {
if (words == null) {
tempDirwords = tempDir + System.getProperty("file.separator"new ArrayList<String>();
}
try {
String processCmd = "";
String operSystem = System.getProperty("os.name").toLowerCase();
// TODO: windows impl
if(operSystem.indexOf("nix") >= 0 || operSystem.indexOf("nux") >= 0 || operSystem.indexOf("aix") > 0 ){ BufferedReader br = new BufferedReader(new FileReader(file));
String line;
"wc -w " + tempDir + tempFile.getName();
} while((line = br.readLine()) != null) {
p = r.exec(processCmd, null, new File(tempDir));
StreamGobblerString[] outputGobblerw = new StreamGobbler(p.getInputStream(), "INFO", log);
line.split(" ");
StreamGobbler errorGobbler = new StreamGobblerwords.addAll(pArrays.getErrorStreamasList(w),"ERROR", log);
outputGobbler.start();
errorGobbler.start();
p.waitFor();
}
File outFile = new File(tempDir + tempFile.getName().substring(0, tempFile.getName().lastIndexOf(".")) + ".txt");
tempFilebr.deleteclose();
if(!Files.exists(outFile.toPath()))
throw new Exception("File not processed correctly. File} iscatch possibly corrupt.");
(IOException e) {
return outFile;
} |
...
Code Block |
---|
language | cpp |
---|
title | Connecting to RabbitMQ |
---|
|
#include <amqpcpp.h>
namespace CPPExample
{
class RabbitMQConnectionHandler : public AMQP::ConnectionHandler { e.printStackTrace();
}
/** }
* return words;
}
} |
The extractor will take the words returned by the adapter and count the occurrence of each word. At this point we are left with a histogram with all words and how often they occur in the text, we can no longer read the text since the information about the order of the words is lost.
Code Block |
---|
language | java |
---|
title | Text Histogram Extractor |
---|
|
public class TextHistogramExtractor implements Extractor
{
@OverrideMethod that is called by the AMQP library every time it has data
public Adapter *newAdapter() {
available that should be sent to RabbitMQ.
throw (new RuntimeException("Not supported."));
* }
@param connection @Override
pointer to the mainpublic connectionString objectgetName() {
* @paramreturn "Text dataHistogram Extractor";
}
memory buffer@Override
with the data thatpublic shouldSet<Class<? beextends sent to RabbitMQAdapter>> supportedAdapters() {
* @paramSet<Class<? extends sizeAdapter>> adapters = new HashSet<Class<? extends Adapter>>();
size of the buffer
*/adapters.add(HasText.class);
virtual void onData(AMQP::Connection *connection, const char *data, size_t size) return adapters;
}
{@Override
public Class<? extends Descriptor> // @todo
getFeatureType() {
return LabelHistogramDescriptor.class;
// }
Add your own implementation,@Override
for example by doingpublic aDescriptor call to the
extract(Adapter adapter) throws Exception {
// if send()adapter systeminstanceof call. But be aware that the send() call may not
HasText) {
LabelHistogramDescriptor desc = new //LabelHistogramDescriptor();
send all data at once, so you also need for to(String takeword care of buffering
: ((HasText) adapter).getWords()) {
// the bytes that could not immediately be sent, and try to send
desc.increaseBin(word);
}
// them again when the return socketdesc;
becomes writable again
} else {
/**
*throw new MethodUnsupportedTypeException();
that is called by the AMQP library when}
the login attempt }
* succeeded.@Override
After this method haspublic been called, the connection is ready boolean hasPreview(){
* toreturn use.false;
*}
@param connection The connection that can now be used
*/@Override
public String virtual void onConnected(Connection *connection)
previewName(){
// @todoreturn null;
}
} |
To compare two texts we use the euclidian distance measure of two histograms. First we normalize each histogram, so we can compare a large text with a small text, next we compare each big of the two histograms. If the bin is missing from either histogram it is assumed to have a value of 0.
Code Block |
---|
language | java |
---|
title | Euclidian Distance Measure |
---|
|
public class LabelHistogramEuclidianDistanceMeasure implements Measure
{
@Override
public SimilarityPercentage normalize(Similarity similarity) {
// add your own implementation, for example by creating a channel
// instance, andreturn startnew publishingSimilarityPercentage(1 or consuming- similarity.getValue());
}
/**@Override
public String *getFeatureType() {
Method that is called by the AMQP library when a fatal error occursreturn LabelHistogramDescriptor.class.getName();
}
@Override
* onpublic the connection, for example because data received from RabbitMQ
* could not be recognized.String getName() {
return "Histogram Distance";
}
@Override
public Class<LabelHistogramEuclidianDistanceMeasure> *getType() {
@param connection return TheLabelHistogramEuclidianDistanceMeasure.class;
connection on which the error occured }
// correlation
* @param @Override
message public Similarity compare(Descriptor desc1, Descriptor desc2) Athrows human readable error messageException {
*/
if ((desc1 instanceof LabelHistogramDescriptor) virtual&& void onError(Connectiondesc2 *connection, const std::string &message)
instanceof LabelHistogramDescriptor)) {
{
LabelHistogramDescriptor lhd1 = // @todo(LabelHistogramDescriptor) desc1;
// add yourLabelHistogramDescriptor ownlhd2 implementation, for example by reporting the error
= (LabelHistogramDescriptor) desc2;
// get toall thepossible userlabels
of your program, log the error, and destruct the
Set<String> labels = new HashSet<String>();
// connection object because it is no longer in a usable state
labels.addAll(lhd1.getLabels());
}
};
} |
Code Block |
---|
|
namespace CPPExample
{ labels.addAll(lhd2.getLabels());
/**
* Parse data that was recevied from// RabbitMQnormalize
*
* Every time that data comes in from RabbitMQ, you should call this method to parse lhd1.normalize();
lhd2.normalize();
* the incoming data, and let it handle by
the AMQP-CPP library. This method returns the number
* // ofcompute bytesdistance
that were processed.
*
* If notdouble allsum bytes could be processed because it only contained a partial frame, you should
* call this same method later on when more data is available. The AMQP-CPP library does not do
* any buffering, so it is up to the caller to ensure that the old data is also passed in that
* later call.
*
* @param buffer= 0;
for (String s : labels) {
Double b1 = lhd1.getBin(s);
Double b2 = lhd2.getBin(s);
buffer to decode
* if @param(b1 == sizenull) {
size of the buffer to decode
* @return sum += b2 * b2;
number of bytes that were processed
*/
} else size_tif parse(char *buffer, size_t size)
{
b2 == null) {
sum return _implementation.parse(buffer, size);
}
} |
...
Code Block |
---|
theme | Emacs |
---|
language | py |
---|
title | Instantiating the logger and starting the extractor |
---|
|
def main():
global logger
# name of receiver
receiver='ExamplePythonExtractor'
# configure the logging system
logging.basicConfig(format="%(asctime)-15s %(name)-10s %(levelname)-7s : %(message)s", level=logging.WARN)
logger = logging.getLogger(receiver)
logger.setLevel(logging.DEBUG)
if len(sys.argv) != 4:
logger.info("Input RabbitMQ username, followed by RabbitMQ password and Medici REST API key.")
sys.exit()
global playserverKey
playserverKey = sys.argv[3]
global exchange_name
exchange_name = sys.argv[4] |
Code Block |
---|
theme | Emacs |
---|
language | py |
---|
title | Connecting to RabbitMQ |
---|
|
# connect to rabbitmq using input username and password
credentials = pika.PlainCredentials(sys.argv[1], sys.argv[2])
parameters = pika.ConnectionParameters(credentials=credentials)
connection = pika.BlockingConnection(parameters)
# connect to channel
channel = connection.channel()
# declare the exchange
channel.exchange_declare(exchange='medici', exchange_type='topic', durable=True)
# declare the queue
channel.queue_declare(queue=receiver, durable=True)
# connect queue and exchange
channel.queue_bind(queue=receiver, exchange='medici', routing_key='*.file.text.plain')
# create listener
channel.basic_consume(on_message, queue=receiver, no_ack=False)
# start listening
logger.info("Waiting for messages. To exit press CTRL+C")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# close connection
connection.close() |
Code Block |
---|
language | py |
---|
title | Processing Messages |
---|
|
def on_message(channel, method, header, body):
global logger
statusreport = {}
inputfile=None
try:
# parse body back from json
jbody=json.loads(body)
host=jbody['host']
fileid=jbody['id']
intermediatefileid=jbody['intermediateId']
if not (host.endswith('/')):
host += '/'
# for status reports
statusreport['file_id'] = fileid
statusreport['extractor_id'] = 'wordCount'
# print what we are doing
logger.debug("[%s] started processing", fileid)
# fetch data
statusreport['status'] = 'Downloading file.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
url=host + 'api/files/' + intermediatefileid + '?key=' + playserverKey
r=requests.get(url, stream=True)
r.raise_for_status()
(fd, inputfile)=tempfile.mkstemp()
with os.fdopen(fd, "w") as f:
for chunk in r.iter_content(chunk_size=10*1024):
f.write(chunk)
# create word count
statusreport['status'] = 'Creating word count.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
create_word_count(inputfile, ext, host, fileid)
# Ack
channel.basic_ack(method.delivery_tag)
logger.debug("[%s] finished processing", fileid)
except subprocess.CalledProcessError as e:
logger.exception("[%s] error processing [exit code=%d]\n%s", fileid, e.returncode, e.output)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
except:
logger.exception("[%s] error processing", fileid)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
finally:
statusreport['status'] = 'DONE.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
if inputfile is not None:
try:
os.remove(inputfile)
except OSError:
pass
except UnboundLocalError:
pass |
Code Block |
---|
language | py |
---|
title | Example Job: Word Count |
---|
|
def create_word_count(inputfile, ext, host, fileid):
global logger
(fd, inputfile)=tempfile.mkstemp(suffix='.' + ext)
try:
# make syscall to wc
subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT)
if(os.path.getsize(wcfile) == 0):
raise Exception("File is empty.")
# upload word count file |
Calling R Scripts from Python
Coming soon...
...
Versus extractors serve to extract a signature from a file's content. These signatures, effectively a hash for the data, are typically numerical vectors which capture some semantically meaningful aspect of the content so that two such signatures can then be compared using some distance measure. Within Versus extractors operate on a data structure representing the content of a file, produced a Versus adapter, and the returned signatures compared by either a Versus similarity or distance measure. The combination of these adapters, extractors, and measures in turn compose a comparison which can be used for relating files according their contents.
...
Code Block |
---|
language | java |
---|
title | Extractor |
---|
|
|
Code Block |
---|
|
public class WordCountMeasure implements Serializable, Measure
{
private static final long SLEEP = 10000;
@Override
public Similarity compare(Descriptor feature1, Descriptor feature2) throws Exception {
Thread.sleep(SLEEP);
return new SimilarityNumber(0);
}
@Override
public SimilarityPercentage normalize(Similarity similarity) {
return null;
}
@Override
public String getFeatureType() {
return WordCountMeasure.class.getName();
}
@Override
public String getName() {
return "Word Count Measure";
}
@Override
public Class<WordCountMeasure> getType() {
return WordCountMeasure.class;
+= b1 * b1;
} else {
sum += (b1 - b2) * (b1 - b2);
}
}
return new SimilarityNumber(Math.sqrt(sum), 0, 1, 0);
} else {
throw new UnsupportedTypeException();
}
}
} |