You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

This page will describe how to create an extractor using java. A lot of this code can be copied and pasted. The only major part is the processFile function. You can find this example also in our code repository at:  https://opensource.ncsa.illinois.edu/stash/projects/MMDB/repos/extractors-examples/browse/java

The first step in the example is to setup some global variables that will be used in the rest of the code.

configuration
    // name where rabbitmq is running
    private static String rabbitmqhost = "localhost";

    // name to show in rabbitmq queue list
    private static String exchange = "medici";

    // name to show in rabbitmq queue list
    private static String extractorName = "wordCount";

    // username and password to connect to rabbitmq
    private static String username = null;
    private static String password = null;

    // accept any type of file that is text
    private static String messageType = "*.file.text.#";

    // secret key used to connect to medici, this will eventually be
    // part of the message received.
    private static String secretKey = "r1ek3rs";

A convenience function to return status messages to Medici

statusUpdate
    private void statusUpdate(Channel channel, AMQP.BasicProperties header, String fileid, String status) throws IOException {
        logger.debug("[" + fileid + "] : " + status);
        Map<String, Object> statusreport = new HashMap<String, Object>();
        statusreport.put("file_id", fileid);
        statusreport.put("extractor_id", extractorName);
        statusreport.put("status", status);
        statusreport.put("start", dateformat.format(new Date()));
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(header.getCorrelationId()).build();
        channel.basicPublish(exchange, header.getReplyTo(), props, mapper.writeValueAsBytes(statusreport));
    }

The main method will listen for messages on the message bus.

main
     public static void main(String[] argv) throws Exception {
        // setup connection parameters
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(rabbitmqhost);
        if ((username != null) && (password != null)) {
            factory.setUsername(username);
            factory.setPassword(password);
        }
        // connect to rabitmq
        Connection connection = factory.newConnection();
        // connect to channel
        final Channel channel = connection.createChannel();
        // declare the exchange
        channel.exchangeDeclare(exchange, "topic", true);
        // declare the queue
        channel.queueDeclare(extractorName, true, false, false, null);
        // connect queue and exchange
        channel.queueBind(extractorName, exchange, messageType);
        // create listener
        channel.basicConsume(extractorName, false, "", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties header, byte[] body) throws IOException {
                WordCount wc = new WordCount();
                wc.onMessage(channel, envelope.getDeliveryTag(), header, new String(body));
            }
        });
        // start listening
        logger.info("[*] Waiting for messages. To exit press CTRL+C");
        while (true) {
            Thread.sleep(1000);
        }
    }
  • No labels