public static void main(String[] argv) throws Exception {
// setup connection parameters
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitmqhost);
if ((username != null) && (password != null)) {
factory.setUsername(username);
factory.setPassword(password);
}
// connect to rabitmq
Connection connection = factory.newConnection();
// connect to channel
final Channel channel = connection.createChannel();
// declare the exchange
channel.exchangeDeclare(exchange, "topic", true);
// declare the queue
channel.queueDeclare(extractorName, true, false, false, null);
// connect queue and exchange
channel.queueBind(extractorName, exchange, messageType);
// create listener
channel.basicConsume(extractorName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties header, byte[] body) throws IOException {
WordCount wc = new WordCount();
wc.onMessage(channel, envelope.getDeliveryTag(), header, new String(body));
}
});
// start listening
logger.info("[*] Waiting for messages. To exit press CTRL+C");
while (true) {
Thread.sleep(1000);
}
} |