Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleProcessing Messages Received From RabbitMQ
protected void processMessageReceived() {
  try {
    try {
      ExampleJavaExtractorService extrServ = new ExampleJavaExtractorService(this);
      jobReceived = getRepresentation(messageReceived, ExtractionJob.class);        
    
      File textFile = extrServ.processJob(jobReceived);
        
      jobReceived.setFlag("wasText");

      log.info("Word count extraction complete. Returning word count file as intermediate result.");
      sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Word count extraction complete. Returning word count file as intermediate result.", log);
        
      uploadIntermediate(textFile, "text/plain", log);
        
      textFile.delete();
        
      sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);

    } catch (Exception ioe) {
      log.error("Could not finish extraction job.", ioe);
      sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Could not finish extraction job.", log);
      sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log);
    }
  } catch(Exception e) {
      e.printStackTrace();
      System.exit(1);
  } 
}

 

...

Code Block
languagecppjava
titleConnecting to RabbitMQProcessing the Job
public File processJob(ExtractionJob receivedMsg) throws Exception{#include <amqpcpp.h>

namespace CPPExample {

  class RabbitMQConnectionHandler : public AMQP::ConnectionHandler {
  log.info("Downloading text file with /**
      *  Method that is called by the AMQP library every time it has data
      *  available that should be sent to RabbitMQ. 
      *  @param  connection  pointer to the main connection object  
      *  @param  data        memory buffer with the data that should be sent to RabbitMQ
      *  @param  size        size of the buffer
      */
     virtual void onData(AMQP::Connection *connection, const char *data, size_t size)
     {
         // @todoID "+ receivedMsg.getIntermediateId() +" from " + receivedMsg.getHost());
  callingExtractor.sendStatus(receivedMsg.getId(), callingExtractor.getClass().getSimpleName(), "Downloading text file.", log);
      
  DefaultHttpClient httpclient = new DefaultHttpClient();
  HttpGet httpGet = new HttpGet(receivedMsg.getHost() +"api/files/"+ receivedMsg.getIntermediateId()+"?key="+playserverKey);    
  HttpResponse fileResponse = httpclient.execute(httpGet);
  log.info(fileResponse.getStatusLine());
  if(fileResponse.getStatusLine().toString().indexOf("200") == -1){
    throw new IOException("File not found.");
  }
  HttpEntity fileEntity = fileResponse.getEntity();
  InputStream fileIs = fileEntity.getContent();
  
  Header[] hdrs = fileResponse.getHeaders("content-disposition");
  String contentDisp = hdrs[0].toString();
  
  String fileName = contentDisp.substring(contentDisp.indexOf("filename=")+9);
  File tempFile = File.createTempFile(fileName.substring(0, fileName.lastIndexOf(".")),      fileName.substring(fileName.lastIndexOf(".")).toLowerCase());
  OutputStream fileOs = new FileOutputStream(tempFile);   
  IOUtils.copy(fileIs,fileOs);
  fileIs.close();
  fileOs.close();
      
  EntityUtils.consume(fileEntity); 
      
  
  log.info("Download complete. Initiating word count generation");
  
  File textFile = processFile(tempFile, receivedMsg.getId());  
  return textFile;       
}

 

 

Anchor
C++
C++
C++

Code Block
languagecpp
titleConnecting to RabbitMQ
#include <amqpcpp.h>

namespace CPPExample {

  class RabbitMQConnectionHandler : public AMQP::ConnectionHandler {//  Add your own implementation, for example by doing a call to the
         //  send() system call. But be aware that the send() call may not
      /**
   //  send all* data atMethod once,that sois youcalled alsoby needthe toAMQP takelibrary careevery oftime buffering
it has data
      * //  the bytes available that couldshould not immediately be sent, and try to sendRabbitMQ. 
      *  @param // connection them againpointer whento the socketmain connection becomesobject writable again
     }

 *  @param  data  /**
      *memory  Method that is called bybuffer with the AMQPdata librarythat whenshould thebe loginsent attemptto RabbitMQ
      *  succeeded. After this method has been called, the connection is ready @param  size        size of the buffer
      */
  to use.
  virtual void onData(AMQP::Connection *connection, const char *data,  @paramsize_t size)
  connection   {
   The connection that can now be used
      */
      virtual void onConnected(Connection *connection)
      {
         // @todo 
         //  addAdd your own implementation, for example by creatingdoing a call channelto the
         //  instance, and start publishing or consuming
      }

send() system call. But be aware that the send() call may not
      /**
   //  send *all data Methodat thatonce, isso calledyou byalso theneed AMQPto librarytake whencare aof fatalbuffering
 error occurs
      * // on the connection,bytes forthat examplecould becausenot dataimmediately receivedbe fromsent, RabbitMQ
and try to send 
  *  could not be recognized.
  //  them again *when the @paramsocket becomes connectionwritable again
     The}

 connection on which the error occured/**
      *  @paramMethod that messageis called by the AMQP library when the login A human readable error messageattempt 
      */
  succeeded. After this method has virtualbeen void onError(Connection *connection, const std::string &message)called, the connection is ready 
      {
*  to use.
     // @todo
*  @param  connection    //  addThe yourconnection own implementation, for example by reporting the errorthat can now be used
      */
      virtual  //void onConnected(Connection *connection)
    to the user{
 of your program, log the error, and destruct the// @todo
         //  connectionadd objectyour becauseown itimplementation, isfor noexample longerby increating a usablechannel state
      }
  };

}
Code Block
languagejava
titleProcessing the Job
public File processJob(ExtractionJob receivedMsg) throws Exception{   //  instance, and start publishing or consuming
      }

  log.info("Downloading text file with ID "+ receivedMsg.getIntermediateId() +" from " + receivedMsg.getHost());
  callingExtractor.sendStatus(receivedMsg.getId(), callingExtractor.getClass().getSimpleName(), "Downloading text file.", log);
      
  DefaultHttpClient httpclient = new DefaultHttpClient();
  HttpGet httpGet = new HttpGet(receivedMsg.getHost() +"api/files/"+ receivedMsg.getIntermediateId()+"?key="+playserverKey);    
  HttpResponse fileResponse = httpclient.execute(httpGet);
  log.info(fileResponse.getStatusLine());
  if(fileResponse.getStatusLine().toString().indexOf("200") == -1){
    throw new IOException("File not found.");
  }
  HttpEntity fileEntity = fileResponse.getEntity();
  InputStream fileIs = fileEntity.getContent();
  
  Header[] hdrs = fileResponse.getHeaders("content-disposition");
  String contentDisp = hdrs[0].toString();
  
  String fileName = contentDisp.substring(contentDisp.indexOf("filename=")+9);
  File tempFile = File.createTempFile(fileName.substring(0, fileName.lastIndexOf(".")),      fileName.substring(fileName.lastIndexOf(".")).toLowerCase());
  OutputStream fileOs = new FileOutputStream(tempFile);   
  IOUtils.copy(fileIs,fileOs);
  fileIs.close();
  fileOs.close();
      
  EntityUtils.consume(fileEntity); 
      
  
  log.info("Download complete. Initiating word count generation");
  
  File textFile = processFile(tempFile, receivedMsg.getId());  
  return textFile;           /**
      *  Method that is called by the AMQP library when a fatal error occurs
      *  on the connection, for example because data received from RabbitMQ
      *  could not be recognized.
      *  @param  connection      The connection on which the error occured
      *  @param  message         A human readable error message
      */
      virtual void onError(Connection *connection, const std::string &message)
      {
        // @todo
        //  add your own implementation, for example by reporting the error
        //  to the user of your program, log the error, and destruct the 
        //  connection object because it is no longer in a usable state
      }
  };

}
Code Block
languagecpp
titleReceiver
namespace CPPExample {

  /**
   *  Parse data that was recevied from RabbitMQ
   *  
   *  Every time that data comes in from RabbitMQ, you should call this method to parse
   *  the incoming data, and let it handle by the AMQP-CPP library. This method returns the number
   *  of bytes that were processed.
   *
   *  If not all bytes could be processed because it only contained a partial frame, you should
   *  call this same method later on when more data is available. The AMQP-CPP library does not do
   *  any buffering, so it is up to the caller to ensure that the old data is also passed in that
   *  later call.
   *
   *  @param  buffer      buffer to decode
   *  @param  size        size of the buffer to decode
   *  @return             number of bytes that were processed
   */
  size_t parse(char *buffer, size_t size)
  {
     return _implementation.parse(buffer, size);
  }
}

...