Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titlestatusUpdatemain
     public static void main(String[] argv) throws Exception {
        // setup connection parameters
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(rabbitmqhost);
        if ((username != null) && (password != null)) {
            factory.setUsername(username);
            factory.setPassword(password);
        }
        // connect to rabitmq
        Connection connection = factory.newConnection();
        // connect to channel
        final Channel channel = connection.createChannel();
        // declare the exchange
        channel.exchangeDeclare(exchange, "topic", true);
        // declare the queue
        channel.queueDeclare(extractorName, true, false, false, null);
        // connect queue and exchange
        channel.queueBind(extractorName, exchange, messageType);
        // create listener
        channel.basicConsume(extractorName, false, "", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties header, byte[] body) throws IOException {
                WordCount wc = new WordCount();
                wc.onMessage(channel, envelope.getDeliveryTag(), header, new String(body));
            }
        });
        // start listening
        logger.info("[*] Waiting for messages. To exit press CTRL+C");
        while (true) {
            Thread.sleep(1000);
        }
    }