Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
themeEmacs
languagejava
titleConnecting to RabbitMQ
protected void startExtractor(String rabbitMQUsername,
	String rabbitMQpassword) {
	try{ 
 		//Open channel and declare exchange and consumer
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(serverAddr);
		factory.setUsername(rabbitMQUsername);
		factory.setPassword(rabbitMQpassword);
		Connection connection = factory.newConnection();

 		final Channel channel = connection.createChannel();
		channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

		channel.queueDeclare(QUEUE_NAME,DURABLE,EXCLUSIVE,AUTO_DELETE,null);
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.file.text.plain.#");
 
 		this.channel = channel;

 		// create listener
		channel.basicConsume(QUEUE_NAME, false, CONSUMER_TAG, new DefaultConsumer(channel) {
 			@Override
 			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				messageReceived = new String(body);
 				long deliveryTag = envelope.getDeliveryTag();
 				// (process the message components here ...)
				System.out.println(" {x} Received '" + messageReceived + "'");
 
				replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
				replyTo = properties.getReplyTo();
 
				processMessageReceived();
				System.out.println(" [x] Done");
				channel.basicAck(deliveryTag, false);
			}
		});

 		// start listening 
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
 		while (true) {
			Thread.sleep(1000);
		}
	}
 	catch(Exception e){
		e.printStackTrace();
		System.exit(1);
	} 
}
Code Block
languagejava
titleProcessing Messages Received From RabbitMQ
protected void processMessageReceived() {
  try {
    try {
      ExampleJavaExtractorService extrServ = new ExampleJavaExtractorService(this);
      jobReceived = getRepresentation(messageReceived, ExtractionJob.class);        
    
      File textFile = extrServ.processJob(jobReceived);
        
      jobReceived.setFlag("wasText");

      log.info("Word count extraction complete. Returning word count file as intermediate result.");
      sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Word count extraction complete. Returning word count file as intermediate result.", log);
        
      uploadIntermediate(textFile, "text/plain", log);
        
      textFile.delete();
        
      sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);

    } catch (Exception ioe) {
      log.error("Could not finish extraction job.", ioe);
      sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Could not finish extraction job.", log);
      sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
    }
  } catch(Exception e) {
      e.printStackTrace();
      System.exit(1);
  } 
}

 

 

Anchor
C++
C++
C++

Code Block
languagecpp
titleConnecting to RabbitMQ
#include <amqpcpp.h>

namespace CPPExample {

  class RabbitMQConnectionHandler : public AMQP::ConnectionHandler {
      /**
      *  Method that is called by the AMQP library every time it has data
      *  available that should be sent to RabbitMQ. 
      *  @param  connection  pointer to the main connection object  
      *  @param  data        memory buffer with the data that should be sent to RabbitMQ
      *  @param  size        size of the buffer
      */
     virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
     {
         // @todo 
         //  Add your own implementation, for example by doing a call to the
         //  send() system call. But be aware that the send() call may not
         //  send all data at once, so you also need to take care of buffering
         //  the bytes that could not immediately be sent, and try to send 
         //  them again when the socket becomes writable again
     }

      /**
      *  Method that is called by the AMQP library when the login attempt 
      *  succeeded. After this method has been called, the connection is ready 
      *  to use.
      *  @param  connection      The connection that can now be used
      */
      virtual void onConnected(Connection *connection)
      {
         // @todo
         //  add your own implementation, for example by creating a channel 
         //  instance, and start publishing or consuming
      }

      /**
      *  Method that is called by the AMQP library when a fatal error occurs
      *  on the connection, for example because data received from RabbitMQ
      *  could not be recognized.
      *  @param  connection      The connection on which the error occured
      *  @param  message         A human readable error message
      */
      virtual void onError(Connection *connection, const std::string &message)
      {
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program, log the error, and destruct the 
        //  connection object because it is no longer in a usable state
      }
  };

}

...