This page will describe how to create an extractor using java. A lot of this code can be copied and pasted. The only major part is the processFile function. You can find this example also in our code repository at: https://opensource.ncsa.illinois.edu/stash/projects/MMDB/repos/extractors-examples/browse/java
The first step in the example is to setup some global variables that will be used in the rest of the code.
configuration
// name where rabbitmq is running private static String rabbitmqhost = "localhost"; // name to show in rabbitmq queue list private static String exchange = "medici"; // name to show in rabbitmq queue list private static String extractorName = "wordCount"; // username and password to connect to rabbitmq private static String username = null; private static String password = null; // accept any type of file that is text private static String messageType = "*.file.text.#"; // secret key used to connect to medici, this will eventually be // part of the message received. private static String secretKey = "r1ek3rs";
A convenience function to return status messages to Medici
statusUpdate
private void statusUpdate(Channel channel, AMQP.BasicProperties header, String fileid, String status) throws IOException { logger.debug("[" + fileid + "] : " + status); Map<String, Object> statusreport = new HashMap<String, Object>(); statusreport.put("file_id", fileid); statusreport.put("extractor_id", extractorName); statusreport.put("status", status); statusreport.put("start", dateformat.format(new Date())); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(header.getCorrelationId()).build(); channel.basicPublish(exchange, header.getReplyTo(), props, mapper.writeValueAsBytes(statusreport)); }
The main method will listen for messages on the message bus.
main
public static void main(String[] argv) throws Exception { // setup connection parameters ConnectionFactory factory = new ConnectionFactory(); factory.setHost(rabbitmqhost); if ((username != null) && (password != null)) { factory.setUsername(username); factory.setPassword(password); } // connect to rabitmq Connection connection = factory.newConnection(); // connect to channel final Channel channel = connection.createChannel(); // declare the exchange channel.exchangeDeclare(exchange, "topic", true); // declare the queue channel.queueDeclare(extractorName, true, false, false, null); // connect queue and exchange channel.queueBind(extractorName, exchange, messageType); // create listener channel.basicConsume(extractorName, false, "", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties header, byte[] body) throws IOException { WordCount wc = new WordCount(); wc.onMessage(channel, envelope.getDeliveryTag(), header, new String(body)); } }); // start listening logger.info("[*] Waiting for messages. To exit press CTRL+C"); while (true) { Thread.sleep(1000); } }