...
Code Block | ||||
---|---|---|---|---|
| ||||
protected void processMessageReceived() { try { try { ExampleJavaExtractorService extrServ = new ExampleJavaExtractorService(this); jobReceived = getRepresentation(messageReceived, ExtractionJob.class); File textFile = extrServ.processJob(jobReceived); jobReceived.setFlag("wasText"); log.info("Word count extraction complete. Returning word count file as intermediate result."); sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Word count extraction complete. Returning word count file as intermediate result.", log); uploadIntermediate(textFile, "text/plain", log); textFile.delete(); sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log); } catch (Exception ioe) { log.error("Could not finish extraction job.", ioe); sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "Could not finish extraction job.", log); sendStatus(jobReceived.getId(), this.getClass().getSimpleName(), "DONE.", log); } } catch(Exception e) { e.printStackTrace(); System.exit(1); } } |
...
Code Block | |||||
---|---|---|---|---|---|
| |||||
public File processJob(ExtractionJob receivedMsg) throws Exception{#include <amqpcpp.h> namespace CPPExample { class RabbitMQConnectionHandler : public AMQP::ConnectionHandler { log.info("Downloading text file with /** * Method that is called by the AMQP library every time it has data * available that should be sent to RabbitMQ. * @param connection pointer to the main connection object * @param data memory buffer with the data that should be sent to RabbitMQ * @param size size of the buffer */ virtual void onData(AMQP::Connection *connection, const char *data, size_t size) { // @todoID "+ receivedMsg.getIntermediateId() +" from " + receivedMsg.getHost()); callingExtractor.sendStatus(receivedMsg.getId(), callingExtractor.getClass().getSimpleName(), "Downloading text file.", log); DefaultHttpClient httpclient = new DefaultHttpClient(); HttpGet httpGet = new HttpGet(receivedMsg.getHost() +"api/files/"+ receivedMsg.getIntermediateId()+"?key="+playserverKey); HttpResponse fileResponse = httpclient.execute(httpGet); log.info(fileResponse.getStatusLine()); if(fileResponse.getStatusLine().toString().indexOf("200") == -1){ throw new IOException("File not found."); } HttpEntity fileEntity = fileResponse.getEntity(); InputStream fileIs = fileEntity.getContent(); Header[] hdrs = fileResponse.getHeaders("content-disposition"); String contentDisp = hdrs[0].toString(); String fileName = contentDisp.substring(contentDisp.indexOf("filename=")+9); File tempFile = File.createTempFile(fileName.substring(0, fileName.lastIndexOf(".")), fileName.substring(fileName.lastIndexOf(".")).toLowerCase()); OutputStream fileOs = new FileOutputStream(tempFile); IOUtils.copy(fileIs,fileOs); fileIs.close(); fileOs.close(); EntityUtils.consume(fileEntity); log.info("Download complete. Initiating word count generation"); File textFile = processFile(tempFile, receivedMsg.getId()); return textFile; } |
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
#include <amqpcpp.h> namespace CPPExample { class RabbitMQConnectionHandler : public AMQP::ConnectionHandler {// Add your own implementation, for example by doing a call to the // send() system call. But be aware that the send() call may not /** // send all* data atMethod once,that sois youcalled alsoby needthe toAMQP takelibrary careevery oftime buffering it has data * // the bytes available that couldshould not immediately be sent, and try to sendRabbitMQ. * @param // connection them againpointer whento the socketmain connection becomesobject writable again } * @param data /** *memory Method that is called bybuffer with the AMQPdata librarythat whenshould thebe loginsent attemptto RabbitMQ * succeeded. After this method has been called, the connection is ready @param size size of the buffer */ to use. virtual void onData(AMQP::Connection *connection, const char *data, @paramsize_t size) connection { The connection that can now be used */ virtual void onConnected(Connection *connection) { // @todo // addAdd your own implementation, for example by creatingdoing a call channelto the // instance, and start publishing or consuming } send() system call. But be aware that the send() call may not /** // send *all data Methodat thatonce, isso calledyou byalso theneed AMQPto librarytake whencare aof fatalbuffering error occurs * // on the connection,bytes forthat examplecould becausenot dataimmediately receivedbe fromsent, RabbitMQ and try to send * could not be recognized. // them again *when the @paramsocket becomes connectionwritable again The} connection on which the error occured/** * @paramMethod that messageis called by the AMQP library when the login A human readable error messageattempt */ succeeded. After this method has virtualbeen void onError(Connection *connection, const std::string &message)called, the connection is ready { * to use. // @todo * @param connection // addThe yourconnection own implementation, for example by reporting the errorthat can now be used */ virtual //void onConnected(Connection *connection) to the user{ of your program, log the error, and destruct the// @todo // connectionadd objectyour becauseown itimplementation, isfor noexample longerby increating a usablechannel state } }; } | ||||
Code Block | ||||
| ||||
public File processJob(ExtractionJob receivedMsg) throws Exception{ // instance, and start publishing or consuming } log.info("Downloading text file with ID "+ receivedMsg.getIntermediateId() +" from " + receivedMsg.getHost()); callingExtractor.sendStatus(receivedMsg.getId(), callingExtractor.getClass().getSimpleName(), "Downloading text file.", log); DefaultHttpClient httpclient = new DefaultHttpClient(); HttpGet httpGet = new HttpGet(receivedMsg.getHost() +"api/files/"+ receivedMsg.getIntermediateId()+"?key="+playserverKey); HttpResponse fileResponse = httpclient.execute(httpGet); log.info(fileResponse.getStatusLine()); if(fileResponse.getStatusLine().toString().indexOf("200") == -1){ throw new IOException("File not found."); } HttpEntity fileEntity = fileResponse.getEntity(); InputStream fileIs = fileEntity.getContent(); Header[] hdrs = fileResponse.getHeaders("content-disposition"); String contentDisp = hdrs[0].toString(); String fileName = contentDisp.substring(contentDisp.indexOf("filename=")+9); File tempFile = File.createTempFile(fileName.substring(0, fileName.lastIndexOf(".")), fileName.substring(fileName.lastIndexOf(".")).toLowerCase()); OutputStream fileOs = new FileOutputStream(tempFile); IOUtils.copy(fileIs,fileOs); fileIs.close(); fileOs.close(); EntityUtils.consume(fileEntity); log.info("Download complete. Initiating word count generation"); File textFile = processFile(tempFile, receivedMsg.getId()); return textFile; /** * Method that is called by the AMQP library when a fatal error occurs * on the connection, for example because data received from RabbitMQ * could not be recognized. * @param connection The connection on which the error occured * @param message A human readable error message */ virtual void onError(Connection *connection, const std::string &message) { // @todo // add your own implementation, for example by reporting the error // to the user of your program, log the error, and destruct the // connection object because it is no longer in a usable state } }; } |
Code Block | ||||
---|---|---|---|---|
| ||||
namespace CPPExample { /** * Parse data that was recevied from RabbitMQ * * Every time that data comes in from RabbitMQ, you should call this method to parse * the incoming data, and let it handle by the AMQP-CPP library. This method returns the number * of bytes that were processed. * * If not all bytes could be processed because it only contained a partial frame, you should * call this same method later on when more data is available. The AMQP-CPP library does not do * any buffering, so it is up to the caller to ensure that the old data is also passed in that * later call. * * @param buffer buffer to decode * @param size size of the buffer to decode * @return number of bytes that were processed */ size_t parse(char *buffer, size_t size) { return _implementation.parse(buffer, size); } } |
...