...
Medici extractors typically serve to automatically extract some new kind of information from a file's content when it is uploaded into Medici. These extractors do this by connecting to a shared RabbitMQ bus. When a new file is uploaded to Medici it is announced on this bus. Extractors that can handle a file of the type posted on the bus are triggered and the data they in turn create is returned to Medici as derived data to be associated with that file. The extractors themselves can be implemented in a variety of languages. Examples of these extractors in different languages can be found in the extractors-templates code repository.
Java
An extractor must establish a connection with the Medici RabbitMQ bus, handle incoming messages, start jobs based on received messages, and ultimatley carry out a job on a given file. The example below simply counts the number of words in a document and returns this information as a piece of metadata to be associated with the file.
Code Block |
---|
theme | Emacs |
---|
language | java |
---|
title | Connecting to RabbitMQ |
---|
|
protected void startExtractor(String rabbitMQUsername, String rabbitMQpassword)
{
try{
//Open channel and declare exchange and consumer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(serverAddr);
factory.setUsername(rabbitMQUsername);
factory.setPassword(rabbitMQpassword);
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME,DURABLE,EXCLUSIVE,AUTO_DELETE,null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.file.text.plain.#");
this.channel = channel;
// create listener
channel.basicConsume(QUEUE_NAME, false, CONSUMER_TAG, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
messageReceived = new String(body);
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
System.out.println(" {x} Received '" + messageReceived + "'");
replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
replyTo = properties.getReplyTo();
processMessageReceived();
System.out.println(" [x] Done");
channel.basicAck(deliveryTag, false);
}
});
// start listening
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
while (true) {
Thread.sleep(1000);
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
} |
Code Block |
---|
language | java |
---|
title | Processing Messages |
---|
|
protected void processMessageReceived()
{
try {
try {
ExampleJavaExtractorService extrServ = new ExampleJavaExtractorService(this);
jobReceived = getRepresentation(messageReceived, ExtractionJob.class);
File textFile = extrServ.processJob(jobReceived);
jobReceived.setFlag("wasText");
log.info("Word count extraction complete. Returning word count file as intermediate result.");
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Word count extraction complete. Returning word count file as intermediate result.", log);
uploadIntermediate(textFile, "text/plain", log);
textFile.delete();
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
} catch (Exception ioe) {
log.error("Could not finish extraction job.", ioe);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Could not finish extraction job.", log);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
}
} catch(Exception e) {
e.printStackTrace();
System.exit(1);
}
} |
Code Block |
---|
language | java |
---|
title | Processing Jobs |
---|
|
public File processJob(ExtractionJob receivedMsg) throws Exception
{
log.info("Downloading text file with ID "+ receivedMsg.getIntermediateId() +" from " + receivedMsg.getHost());
callingExtractor.sendStatus(receivedMsg.getId(), callingExtractor.getClass().getSimpleName(), "Downloading text file.", log);
DefaultHttpClient httpclient = new DefaultHttpClient();
HttpGet httpGet = new HttpGet(receivedMsg.getHost() +"api/files/"+ receivedMsg.getIntermediateId()+"?key="+playserverKey);
HttpResponse fileResponse = httpclient.execute(httpGet);
log.info(fileResponse.getStatusLine());
if(fileResponse.getStatusLine().toString().indexOf("200") == -1){
throw new IOException("File not found.");
}
HttpEntity fileEntity = fileResponse.getEntity();
InputStream fileIs = fileEntity.getContent();
Header[] hdrs = fileResponse.getHeaders("content-disposition");
String contentDisp = hdrs[0].toString();
String fileName = contentDisp.substring(contentDisp.indexOf("filename=")+9);
File tempFile = File.createTempFile(fileName.substring(0, fileName.lastIndexOf(".")), fileName.substring(fileName.lastIndexOf(".")).toLowerCase());
OutputStream fileOs = new FileOutputStream(tempFile);
IOUtils.copy(fileIs,fileOs);
fileIs.close();
fileOs.close();
EntityUtils.consume(fileEntity);
log.info("Download complete. Initiating word count generation");
File textFile = processFile(tempFile, receivedMsg.getId());
return textFile;
} |
Code Block |
---|
language | java |
---|
title | Processing Files |
---|
|
private File processFile(File tempFile, String originalFileId) throws Exception
{
Runtime r = Runtime.getRuntime();
Process p; // Process tracks one external native process
String tempDir = System.getProperty("java.io.tmpdir");
if (new Character(tempDir.charAt(tempDir.length()-1)).toString().equals(System.getProperty("file.separator")) == false){
tempDir = tempDir + System.getProperty("file.separator");
}
String processCmd = "";
String operSystem = System.getProperty("os.name").toLowerCase();
// TODO: windows impl
if(operSystem.indexOf("nix") >= 0 || operSystem.indexOf("nux") >= 0 || operSystem.indexOf("aix") > 0 ){
"wc -w " + tempDir + tempFile.getName();
}
p = r.exec(processCmd, null, new File(tempDir));
StreamGobbler outputGobbler = new StreamGobbler(p.getInputStream(), "INFO", log);
StreamGobbler errorGobbler = new StreamGobbler(p.getErrorStream(),"ERROR", log);
outputGobbler.start();
errorGobbler.start();
p.waitFor();
File outFile = new File(tempDir + tempFile.getName().substring(0, tempFile.getName().lastIndexOf(".")) + ".txt");
tempFile.delete();
if(!Files.exists(outFile.toPath()))
throw new Exception("File not processed correctly. File is possibly corrupt.");
return outFile;
} |
...
Java extractors can be created using the amqp-client jar file. This allows you to connect to the RabitMQ bus and received messages. The easiest way to get up and running is to use maven with java to add all required dependencies. An example of an extractor written in java can be found at Medici Extractor in Java.
PythonPython extractors will often be based on the packages pika and requests. This allows you to connect to the RabittMQ message bus and easily send requests to medici. A complete example of the python extractor can be found at Medici Extractor in Python
Calling R Scripts from Python
Coming soon...
Versus Extractors
Versus extractors serve to extract a signature from a file's content. These signatures, effectively a hash for the data, are typically numerical vectors which capture some semantically meaningful aspect of the content so that two such signatures can then be compared using some distance measure. Within Versus extractors operate on a data structure representing the content of a file, produced a Versus adapter, and the returned signatures compared by either a Versus similarity or distance measure. The combination of these adapters, extractors, and measures in turn compose a comparison which can be used for relating files according their contents.
JavaThe main class sets up the comparison, this is done by adding the two files that need to be compared, as well as the adapter to load the file, the extractor to extract a feature from the file, and a measurement to compare the two features.
Code Block |
---|
|
static public void main(String[] args) {
PairwiseComparison comparison = new PairwiseComparison();
comparison.setId(UUID.randomUUID().toString());
comparison.setFirstDataset(new File("data/test1.txt"));
comparison.setSecondDataset(new File("data/test2.txt"));
comparison.setAdapterId(TextAdapter.class.getName());
comparison.setExtractorId(TextHistogramExtractor.class.getName());
comparison.setMeasureId(LabelHistogramEuclidianDistanceMeasure.class.getName());
ExecutionEngine ee = new ExecutionEngine();
ee.submit(comparison, new ComparisonStatusHandler() {
@Override
public void onStarted() { |
Code Block |
---|
language | cpp |
---|
title | Main Function |
---|
|
int main(int argc, const char *argv[])
{
// need an ip
if (argc != 2)
{
// report error
std::cerr << "usage: " << argv[0] << " <ip>" << std::endl;
// done
return -1;
}
else
{
// create connection
MyConnection connection(argv[1]);
// start the main event loop
Event::MainLoop::instance()->run();
// done
return 0;
}
} |
Code Block |
---|
language | cpp |
---|
title | RabbitMQ Connection and Error Handling |
---|
|
MyConnection::MyConnection(const std::string &ip) :
_socket(Event::MainLoop::instance(), this),
_connection(nullptr),
_channel(nullptr)
{
// start connecting
if (_socket.connect(Network::Ipv4Address(ip), 5672)) return;
// failure
onFailure(&_socket);
}
/**
* Destructor
*/
MyConnection::~MyConnection()
{
// do we still have a channel?
if (_channel) delete _channel;
// do we still have a connection?
if (_connection) delete _connection;
}
/**
* Method that is called when the connection failed
* @param socket Pointer to the socket
*/
void MyConnection::onFailure(Network::TcpSocket *socket)
{
// report error
std::cout << "connect failure" << std::endl;
}
/**
* Method that is called when the connection timed out (which also is a failure
* @param socket Pointer to the socket
*/
void MyConnection::onTimeout(Network::TcpSocket *socket)
{
// report error
std::cout << "connect timeout" << std::endl;
}
/**
* Method that is called when the connection succeeded
* @param socket Pointer to the socket
*/
void MyConnection::onConnected(Network::TcpSocket *socket)
{
// report connection
std::cout << "connected" << std::endl;
// we are connected, leap out if there already is a amqp connection
if (_connection) return;
// create amqp connection, and a new channel
_connection = new AMQP::Connection(this, AMQP::Login("guest", "guest"), "/");
_channel = new AMQP::Channel(_connection, this);
// we declare a queue, an exchange and we publish a message
_channel->declareQueue("my_queue");
_channel->declareExchange("my_exchange", AMQP::direct);
_channel->bindQueue("my_exchange", "my_queue", "key");
}
/**
* Method that is called when the socket is closed (as a result of a TcpSocket::close() call)
* @param socket Pointer to the socket
*/
void MyConnection::onClosed(Network::TcpSocket *socket)
{
// show
std::cout << "myconnection closed" << std::endl;
// close the channel and connection
if (_channel) delete _channel;
if (_connection) delete _connection;
// set to null
_channel = nullptr;
_connection = nullptr;
}
/**
* Method that is called when the peer closed the connection
* @param socket Pointer to the socket
*/
void MyConnection::onLost(Network::TcpSocket *socket)
{
// report error
std::cout << "connection lost" << std::endl;
// close the channel and connection
if (_channel) delete _channel;
if (_connection) delete _connection;
// set to null
_channel = nullptr;
_connection = nullptr;
}
/**
* Method that is called when data is received on the socket
* @param socket Pointer to the socket
* @param buffer Pointer to the fill input buffer
*/
void MyConnection::onData(Network::TcpSocket *socket, Network::Buffer *buffer)
{
// send what came in
std::cout << "received: " << buffer->size() << " bytes" << std::endl;
// leap out if there is no connection
if (!_connection) return;
// let the data be handled by the connection
size_t bytes = _connection->parse(buffer->data(), buffer->size());
// shrink the buffer
buffer->shrink(bytes);
}
/**
* Method that is called when data needs to be sent over the network
*
* Note that the AMQP library does no buffering by itself. This means
* that this method should always send out all data or do the buffering
* itself.
*
* @param connection The connection that created this output
* @param buffer Data to send
* @param size Size of the buffer
*/
void MyConnection::onData(AMQP::Connection *connection, const char *buffer, size_t size)
{
// send to the socket
_socket.write(buffer, size);
}
/**
* When the connection ends up in an error state this method is called.
* This happens when data comes in that does not match the AMQP protocol
*
* After this method is called, the connection no longer is in a valid
* state and can be used. In normal circumstances this method is not called.
*
* @param connection The connection that entered the error state
* @param message Error message
*/
void MyConnection::onError(AMQP::Connection *connection, const std::string &message)
{
// report error
std::cout << "AMQP Connection error: " << message << std::endl;
}
/**
* Method that is called when the login attempt succeeded. After this method
* was called, the connection is ready to use
*
* @param connection The connection that can now be used
*/
void MyConnection::onConnected(AMQP::Connection *connection)
{
// show
std::cout << "AMQP login success" << std::endl;
// create channel if it does not yet exist
if (!_channel) _channel = new AMQP::Channel(connection, this);
}
/**
* Method that is called when the channel was succesfully created.
* Only after the channel was created, you can use it for subsequent messages over it
* @param channel
*/
void MyConnection::onReady(AMQP::Channel *channel)
{
// show
std::cout << "AMQP channel ready, id: " << (int) channel->id() << std::endl;
}
/**
* An error has occured on the channel
* @param channel
* @param message
*/
void MyConnection::onError(AMQP::Channel *channel, const std::string &message)
{
// show
std::cout << "AMQP channel error, id: " << (int) channel->id() << " - message: " << message << std::endl;
// main channel cause an error, get rid of if
delete _channel;
// reset pointer
_channel = nullptr;
}
/**
* Method that is called when the channel was paused
* @param channel
*/
void MyConnection::onPaused(AMQP::Channel *channel)
{
// show
std::cout << "AMQP channel paused" << std::endl;
}
/**
* Method that is called when the channel was resumed
* @param channel
*/
void MyConnection::onResumed(AMQP::Channel *channel)
{
// show
std::cout << "AMQP channel resumed" << std::endl;
}
/**
* Method that is called when a channel is closed
* @param channel
*/
void MyConnection::onClosed(AMQP::Channel *channel)
{
// show
std::cout << "AMQP channel closed" << std::endl;
}
/**
* Method that is called when a transaction was started
* @param channel
*/
void MyConnection::onTransactionStarted(AMQP::Channel *channel)
{
// show
std::cout << "AMQP transaction started" << std::endl;
}
/**
* Method that is called when a transaction was committed
* @param channel
*/
void MyConnection::onTransactionCommitted(AMQP::Channel *channel)
{
// show
std::cout << "AMQP transaction committed" << std::endl;
}
/**
* Method that is called when a transaction was rolled back
* @param channel
*/
void MyConnection::onTransactionRolledBack(AMQP::Channel *channel)
{
// show
std::cout << "AMQP transaction rolled back" << std::endl;
}
/**
* Mehod that is called when an exchange is declared
* @param channel
*/
void MyConnection::onExchangeDeclared(AMQP::Channel *channel)
{
// show
std::cout << "AMQP exchange declared" << std::endl;
}
/**
* Method that is called when an exchange is bound
* @param channel
*/
void MyConnection::onExchangeBound(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Exchange bound" << std::endl;
}
/**
* Method that is called when an exchange is unbound
* @param channel
*/
void MyConnection::onExchangeUnbound(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Exchange unbound" << std::endl;
}
/**
* Method that is called when an exchange is deleted
* @param channel
*/
void MyConnection::onExchangeDeleted(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Exchange deleted" << std::endl;
}
/**
* Method that is called when a queue is declared
* @param channel
* @param name name of the queue
* @param messageCount number of messages in queue
* @param consumerCount number of active consumers
*/
void MyConnection::onQueueDeclared(AMQP::Channel *channel, const std::string &name, uint32_t messageCount, uint32_t consumerCount)
{
// show
std::cout << "AMQP Queue declared" << std::endl;
}
/**
* Method that is called when a queue is bound
* @param channel
* @param
*/
void MyConnection::onQueueBound(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Queue bound" << std::endl;
_channel->publish("my_exchange", "invalid-key", AMQP::mandatory, "this is the message");
}
/**
* Method that is called when a queue is deleted
* @param channel
* @param messageCount number of messages deleted along with the queue
*/
void MyConnection::onQueueDeleted(AMQP::Channel *channel, uint32_t messageCount)
{
// show
std::cout << "AMQP Queue deleted" << std::endl;
}
/**
* Method that is called when a queue is unbound
* @param channel
*/
void MyConnection::onQueueUnbound(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Queue unbound" << std::endl;
}
/**
* Method that is called when a queue is purged
* @param messageCount number of message purged
*/
void MyConnection::onQueuePurged(AMQP::Channel *channel, uint32_t messageCount)
{
// show
std::cout << "AMQP Queue purged" << std::endl;
}
/**
* Method that is called when the quality-of-service was changed
* This is the result of a call to Channel::setQos()
*/
void MyConnection::onQosSet(AMQP::Channel *channel)
{
// show
std::cout << "AMQP Qos set" << std::endl;
}
/**
* Method that is called when a consumer was started
* This is the result of a call to Channel::consume()
* @param channel the channel on which the consumer was started
* @param tag the consumer tag
*/
void MyConnection::onConsumerStarted(AMQP::Channel *channel, const std::string &tag)
{
// show
std::cout << "AMQP consumer started" << std::endl;
}
/**
* Method that is called when a message has been received on a channel
* @param channel the channel on which the consumer was started
* @param message the consumed message
* @param deliveryTag the delivery tag, you need this to acknowledge the message
* @param consumerTag the consumer identifier that was used to retrieve this message
* @param redelivered is this a redelivered message?
*/
void MyConnection::onReceived(AMQP::Channel *channel, const AMQP::Message &message, uint64_t deliveryTag, const std::string &consumerTag, bool redelivered)
{
// show
std::cout << "AMQP consumed: " << message.message() << std::endl;
// ack the message
channel->ack(deliveryTag);
}
/**
* Method that is called when a message you tried to publish was returned
* by the server. This only happens when the 'mandatory' or 'immediate' flag
* was set with the Channel::publish() call.
* @param channel the channel on which the message was returned
* @param message the returned message
* @param code the reply code
* @param text human readable reply reason
*/
void MyConnection::onReturned(AMQP::Channel *channel, const AMQP::Message &message, int16_t code, const std::string &text)
{
// show
std::cout << "AMQP message returned: " << text << std::endl;
}
/**
* Method that is called when a consumer was stopped
* This is the result of a call to Channel::cancel()
* @param channel the channel on which the consumer was stopped
* @param tag the consumer tag
*/
void MyConnection::onConsumerStopped(AMQP::Channel *channel, const std::string &tag)
{
// show
std::cout << "AMQP consumer stopped" << std::endl;
} |
...
Code Block |
---|
theme | Emacs |
---|
language | py |
---|
title | Instantiating the logger and starting the extractor |
---|
|
def main():
global logger
# name of receiver
receiver='ExamplePythonExtractor'
# configure the logging system
logging.basicConfig(format="%(asctime)-15s %(name)-10s %(levelname)-7s : %(message)s", level=logging.WARN)
logger = logging.getLogger(receiver)
logger.setLevel(logging.DEBUG)
if len(sys.argv) != 4:
logger.info("Input RabbitMQ username, followed by RabbitMQ password and Medici REST API key.")
sys.exit()
global playserverKey
playserverKey = sys.argv[3]
global exchange_name
exchange_name = sys.argv[4] |
Code Block |
---|
theme | Emacs |
---|
language | py |
---|
title | Connecting to RabbitMQ |
---|
|
# connect to rabbitmq using input username and password
credentials = pika.PlainCredentials(sys.argv[1], sys.argv[2])
parameters = pika.ConnectionParameters(credentials=credentials)
connection = pika.BlockingConnection(parameters)
# connect to channel
channel = connection.channel()
# declare the exchange
channel.exchange_declare(exchange='medici', exchange_type='topic', durable=True)
# declare the queue
channel.queue_declare(queue=receiver, durable=True)
# connect queue and exchange
channel.queue_bind(queue=receiver, exchange='medici', routing_key='*.file.text.plain')
# create listener
channel.basic_consume(on_message, queue=receiver, no_ack=False)
# start listening
logger.info("Waiting for messages. To exit press CTRL+C")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# close connection
connection.close() |
Code Block |
---|
language | py |
---|
title | Processing Messages |
---|
|
def on_message(channel, method, header, body):
global logger
statusreport = {}
inputfile=None
try:
# parse body back from json
jbody=json.loads(body)
host=jbody['host']
fileid=jbody['id']
intermediatefileid=jbody['intermediateId']
if not (host.endswith('/')):
host += '/'
# for status reports
statusreport['file_id'] = fileid
statusreport['extractor_id'] = 'wordCount'
# print what we are doing
logger.debug("[%s] started processing", fileid)
# fetch data
statusreport['status'] = 'Downloading file.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
url=host + 'api/files/' + intermediatefileid + '?key=' + playserverKey
r=requests.get(url, stream=True)
r.raise_for_status()
(fd, inputfile)=tempfile.mkstemp()
with os.fdopen(fd, "w") as f:
for chunk in r.iter_content(chunk_size=10*1024):
f.write(chunk)
# create word count
statusreport['status'] = 'Creating word count.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
create_word_count(inputfile, ext, host, fileid)
# Ack
channel.basic_ack(method.delivery_tag)
logger.debug("[%s] finished processing", fileid)
except subprocess.CalledProcessError as e:
logger.exception("[%s] error processing [exit code=%d]\n%s", fileid, e.returncode, e.output)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
except:
logger.exception("[%s] error processing", fileid)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
finally:
statusreport['status'] = 'DONE.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
if inputfile is not None:
try:
os.remove(inputfile)
except OSError:
pass
except UnboundLocalError:
pass |
Code Block |
---|
language | py |
---|
title | Example Job: Word Count |
---|
|
def create_word_count(inputfile, ext, host, fileid):
global logger
(fd, inputfile)=tempfile.mkstemp(suffix='.' + ext)
try:
# make syscall to wc
subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT)
if(os.path.getsize(wcfile) == 0):
raise Exception("File is empty.")
# upload word count file |
Calling R Scripts from Python
Coming soon...
...
Versus extractors serve to extract a signature from a file's content. These signatures, effectively a hash for the data, are typically numerical vectors which capture some semantically meaningful aspect of the content so that two such signatures can then be compared using some distance measure. Within Versus extractors operate on a data structure representing the content of a file, produced a Versus adapter, and the returned signatures compared by either a Versus similarity or distance measure. The combination of these adapters, extractors, and measures in turn compose a comparison which can be used for relating files according their contents.
...
Code Block |
---|
language | java |
---|
title | PDF Adapter |
---|
|
public class PDFAdapter implements FileLoader, HasRGBPixels, HasText, HasLineGraphics {
private File file;
private double[][][] pixels;
private List<String> words;
private List<Path2D> graphics;
static public void main(String[] args) {
List<Double> weights = new ArrayList<Double>();
List<PairwiseComparison> comparisons = new ArrayList<PairwiseComparison>();
PairwiseComparison comparison = new PairwiseComparison();
comparison.setId(UUID.randomUUID().toString());
comparison.setFirstDataset(new File("data/test1.pdf"));
comparison.setSecondDataset(new File("data/test2.pdf"));
comparison.setAdapterId(PDFAdapter.class.getName());
comparison.setExtractorId(TextHistogramExtractor.class.getName());
comparison.setMeasureId(LabelHistogramEuclidianDistanceMeasure.class.getName());
comparisons.add(comparison);
weights.add(0.7);
comparison = new PairwiseComparison();
comparison.setId(UUID.randomUUID().toString());
comparison.setFirstDataset(new File("data/test1.pdf"));
comparison.setSecondDataset(new File("data/test2.pdf"));
comparison.setAdapterId(PDFAdapter.class.getName());
comparison.setExtractorId(TextHistogramExtractor.class.getName());
comparison.setMeasureId(LabelHistogramEuclidianDistanceMeasure.class.getName());
comparisons.add(comparison);
weights.add(0.2);
comparison = new PairwiseComparison();
comparison.setId(UUID.randomUUID().toString());
comparison.setFirstDataset(new File("data/test1.pdf"));
comparison.setSecondDataset(new File("data/test2.pdf"));
comparison.setAdapterId(PDFAdapter.class.getName());
comparison.setExtractorId(TextHistogramExtractor.class.getName());
comparison.setMeasureId(LabelHistogramEuclidianDistanceMeasure.class.getName());
comparisons.add(comparison);
weights.add(0.1);
ComprehensiveEngine engine = new ComprehensiveEngine();
Double d = engine.compute(comparisons, weights);
System.out.println(d);
System.exit(0);
ExecutionEngine ee = new ExecutionEngine();
ee.submit(comparison, new ComparisonStatusHandler() {
@Override
public void onStarted() {
System.out.println("STARTED : ");
}
@Override
public void onFailed(String msg, Throwable e) {
System.out.println("FAILED : " + msg);
e.printStackTrace();
System.exit(0);
}
@Override
public void onDone(double value) {
System.out.println("DONE : " + value);
System.exit(0);
}
@Override
public void onAborted(String msg) {
System.out.println("ABORTED : " + msg);
System.exit(0);
}
});
}
public PDFAdapter() {
}
// ----------------------------------------------------------------------
// FileLoader
// ----------------------------------------------------------------------
@Override
public void load(File file) {
this.file = file;
}
@Override
public String getName() {
return "PDF Document";
}
@Override
public List<String> getSupportedMediaTypes() {
List<String> mediaTypes = new ArrayList<String>();
mediaTypes.add("application/pdf");
return mediaTypes;
}
// ----------------------------------------------------------------------
// HasRGBPixels
// ----------------------------------------------------------------------
@Override
public double getRGBPixel(int row, int column, int band) {
if ((pixels == null) && (getRGBPixels() == null)) {
return Double.NaN;
} else {
return pixels[row][column][band];
}
}
@Override
public double[][][] getRGBPixels() {
if (pixels == null) {
// create monster array.
try {
loadImages();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
return pixels;
}
private void loadImages() throws IOException {
PDFParser parser = new PDFParser(new FileInputStream(file), PDFParser.EXTRACT_IMAGES);
// get all images in the pdf document
List<PDFObjectImage> images = new ArrayList<PDFObjectImage>();
for (int i = 0; i < parser.getPageCount(); i++) {
parser.parse(i);
for (PDFObject po : parser.getObjects()) {
if (po instanceof PDFObjectImage) {
PDFObjectImage poi = (PDFObjectImage) po;
images.add(poi);
}
}
}
// create a virtual image that is all the images combined
// first column is the image number
// second column is pixel (col + row*width)
// third column is RGB value
pixels = new double[images.size()][][];
for (int i = 0; i < images.size(); i++) {
PDFObjectImage poi = images.get(i);
int w = poi.getImage().getWidth();
int h = poi.getImage().getHeight();
int[] rgb = poi.getImage().getRGB(0, 0, w, h, null, 0, w);
pixels[i] = new double[rgb.length][3];
for (int j = 0; j < rgb.length; j++) {
pixels[i][j][0] = (rgb[j] & 0xff0000) >> 16;
pixels[i][j][1] = (rgb[j] & 0x00ff00) >> 8;
pixels[i][j][2] = (rgb[j] & 0x0000ff) >> 0;
}
}
// close the parser
parser.close();
}
// ----------------------------------------------------------------------
// HasText
// ----------------------------------------------------------------------
@Override
public List<String> getWords() {
if (words == null) {
words = new ArrayList<String>();
try {
PDFParser parser = new PDFParser(new FileInputStream(file), PDFParser.EXTRACT_TEXT);
PDFGroupingText textgroup = new PDFGroupingText(PDFGroupingText.REMOVE_EMPTY_LINES);
System.out.println("STARTED : ");
}
for (int i = 0; i < parser.getPageCount(); i++) {@Override
public void onFailed(String msg, Throwable e) {
parser.parse(i);
System.out.println("FAILED : " + msg);
for (PDFObject po : textgroup.group(parser.getObjects())) {
e.printStackTrace();
if (po instanceof PDFObjectText) {
System.exit(0);
}
@Override
for (String spublic :void onDone((PDFObjectText) po).getText().split("\\W+")double value) { //$NON-NLS-1$
System.out.println("DONE : " + value);
if (!sSystem.isEmptyexit(0)) {;
}
@Override
words.add(s);public void onAborted(String msg) {
System.out.println("ABORTED : " + msg);
}
System.exit(0);
}
});
}
|
The text adapter will take a text file, and load all the file, splitting the text into words and return a list of all words in the text. The words are still in the right order, and it is possible to read the original information of the file by reading the words in the order as they are returned by getWords().
Code Block |
---|
language | java |
---|
title | Text Adapter |
---|
|
public class TextAdapter implements FileLoader, HasText {
private File }file;
private List<String> words;
public }TextAdapter() {}
}
// ----------------------------------------------------------------------
// FileLoader
// ----------------------------------------------------------------------
@Override
parser.close();
public void load(File file) {
} catch (IOException e) {this.file = file;
}
@Override
public String e.printStackTracegetName(); {
return "Text Document";
}
@Override
}
public List<String> getSupportedMediaTypes() {
return words;
List<String> mediaTypes }
= // ----------------------------------------------------------------------
// HasLineGraphicsnew ArrayList<String>();
mediaTypes.add("text/*");
return mediaTypes;
}
// ----------------------------------------------------------------------
@Override
public List<Path2D> getLineGraphics() {
if (graphics == null) {
graphics = new ArrayList<Path2D>();
try {--
PDFParser parser = new PDFParser(new FileInputStream(file), PDFParser.EXTRACT_GRAPHICS);// HasText
// ----------------------------------------------------------------------
@Override
PDFGroupingGraphicspublic textgroup = new PDFGroupingGraphicsList<String> getWords();
{
for (int i = 0; i < parser.getPageCount(); i++) if (words == null) {
words = parser.parse(inew ArrayList<String>();
for (PDFObject po : textgroup.group(parser.getObjects())) try {
BufferedReader br = new BufferedReader(new FileReader(file));
if (po instanceof PDFObjectGraphics) {
String line;
graphics.addwhile(((PDFObjectGraphics) po).getPathline = br.readLine()); != null) {
String[] w = }line.split(" ");
}words.addAll(Arrays.asList(w));
}
parserbr.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return words;
return graphics;
}
}}
} |
The extractor will take the words returned by the adapter and count the occurrence of each word. At this point we are left with a histogram with all words and how often they occur in the text, we can no longer read the text since the information about the order of the words is lost.
Code Block |
---|
language | java |
---|
title | Text Histogram Extractor |
---|
|
publicpublic class TextHistogramExtractor implements Extractor
{
@Override
public Adapter newAdapter() {
throw (new RuntimeException("Not supported."));
}
@Override
public String getName() {
return "Text Histogram Extractor";
}
@Override
public Set<Class<? extends Adapter>> supportedAdapters() {
Set<Class<? extends Adapter>> adapters = new HashSet<Class<? extends Adapter>>();
adapters.add(HasText.class);
return adapters;
}
@Override
public Class<? extends Descriptor> getFeatureType() {
return LabelHistogramDescriptor.class;
}
@Override
public Descriptor extract(Adapter adapter) throws Exception {
if (adapter instanceof HasText) {
LabelHistogramDescriptor desc = new LabelHistogramDescriptor();
for for (String word : ((HasText) adapter).getWords()) {
desc.increaseBin(word);
}
return return desc;
} else {
throw new UnsupportedTypeException();
}
}
@Override
public boolean hasPreview(){
return false;
}
@Override
public String previewName(){
return null;
}
} |
To compare two texts we use the euclidian distance measure of two histograms. First we normalize each histogram, so we can compare a large text with a small text, next we compare each big of the two histograms. If the bin is missing from either histogram it is assumed to have a value of 0.
Code Block |
---|
language | java |
---|
title | Euclidian Distance Measure |
---|
|
public class LabelHistogramEuclidianDistanceMeasure implements Measure
{
@Override
public SimilarityPercentage normalize(Similarity similarity) {
return new SimilarityPercentage(1 - similarity.getValue());
}
@Override
public String getFeatureType() {
return LabelHistogramDescriptor.class.getName();
}
@Override
public String getName() {
return "Histogram Distance";
}
@Override
public Class<LabelHistogramEuclidianDistanceMeasure> getType() {
return LabelHistogramEuclidianDistanceMeasure.class;
}
// correlation
@Override
public Similarity compare(Descriptor desc1, Descriptor desc2) throws Exception {
if ((desc1 instanceof LabelHistogramDescriptor) && (desc2 instanceof LabelHistogramDescriptor)) {
LabelHistogramDescriptor lhd1 = (LabelHistogramDescriptor) desc1;
LabelHistogramDescriptor lhd2 = (LabelHistogramDescriptor) desc2;
// get all possible labels
Set<String> labels = new HashSet<String>();
labels.addAll(lhd1.getLabels());
labels.addAll(lhd2.getLabels());
// normalize
lhd1.normalize();
lhd2.normalize();
// compute distance
double sum = 0;
for for (String s : labels) {
Double b1 = lhd1.getBin(s);
Double b2 = lhd2.getBin(s);
if if (b1 == null) {
sum += b2 * b2;
} else if (b2 == null) {
sum += b1 * b1;
} else {
sum += (b1 - b2) * (b1 - b2);
}
}
return new SimilarityNumber(Math.sqrt(sum), 0, 1, 0);
} else {
throw new UnsupportedTypeException();
}
}
} |