...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
protected void startExtractor(String rabbitMQUsername, String rabbitMQpassword) { try{ //Open channel and declare exchange and consumer ConnectionFactory factory = new ConnectionFactory(); factory.setHost(serverAddr); factory.setUsername(rabbitMQUsername); factory.setPassword(rabbitMQpassword); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); channel.queueDeclare(QUEUE_NAME,DURABLE,EXCLUSIVE,AUTO_DELETE,null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.file.text.plain.#"); this.channel = channel; // create listener channel.basicConsume(QUEUE_NAME, false, CONSUMER_TAG, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { messageReceived = new String(body); long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) System.out.println(" {x} Received '" + messageReceived + "'"); replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(); replyTo = properties.getReplyTo(); processMessageReceived(); System.out.println(" [x] Done"); channel.basicAck(deliveryTag, false); } }); // start listening System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); while (true) { Thread.sleep(1000); } } catch(Exception e){ e.printStackTrace(); System.exit(1); } } |
Code Block | ||||
---|---|---|---|---|
| ||||
protected void processMessageReceived() {
try {
try {
ExampleJavaExtractorService extrServ = new ExampleJavaExtractorService(this);
jobReceived = getRepresentation(messageReceived, ExtractionJob.class);
File textFile = extrServ.processJob(jobReceived);
jobReceived.setFlag("wasText");
log.info("Word count extraction complete. Returning word count file as intermediate result.");
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Word count extraction complete. Returning word count file as intermediate result.", log);
uploadIntermediate(textFile, "text/plain", log);
textFile.delete();
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
} catch (Exception ioe) {
log.error("Could not finish extraction job.", ioe);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Could not finish extraction job.", log);
sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
}
} catch(Exception e) {
e.printStackTrace();
System.exit(1);
}
} |
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
#include <amqpcpp.h> namespace CPPExample { class RabbitMQConnectionHandler : public AMQP::ConnectionHandler { /** * Method that is called by the AMQP library every time it has data * available that should be sent to RabbitMQ. * @param connection pointer to the main connection object * @param data memory buffer with the data that should be sent to RabbitMQ * @param size size of the buffer */ virtual void onData(AMQP::Connection *connection, const char *data, size_t size) { // @todo // Add your own implementation, for example by doing a call to the // send() system call. But be aware that the send() call may not // send all data at once, so you also need to take care of buffering // the bytes that could not immediately be sent, and try to send // them again when the socket becomes writable again } /** * Method that is called by the AMQP library when the login attempt * succeeded. After this method has been called, the connection is ready * to use. * @param connection The connection that can now be used */ virtual void onConnected(Connection *connection) { // @todo // add your own implementation, for example by creating a channel // instance, and start publishing or consuming } /** * Method that is called by the AMQP library when a fatal error occurs * on the connection, for example because data received from RabbitMQ * could not be recognized. * @param connection The connection on which the error occured * @param message A human readable error message */ virtual void onError(Connection *connection, const std::string &message) { // @todo // add your own implementation, for example by reporting the error // to the user of your program, log the error, and destruct the // connection object because it is no longer in a usable state } }; } |
...