...
Code Block | ||||
---|---|---|---|---|
| ||||
public File processJob(ExtractionJob receivedMsg) throws Exception{ log.info("Downloading text file with ID "+ receivedMsg.getIntermediateId() +" from " + receivedMsg.getHost()); callingExtractor.sendStatus(receivedMsg.getId(), callingExtractor.getClass().getSimpleName(), "Downloading text file.", log); DefaultHttpClient httpclient = new DefaultHttpClient(); HttpGet httpGet = new HttpGet(receivedMsg.getHost() +"api/files/"+ receivedMsg.getIntermediateId()+"?key="+playserverKey); HttpResponse fileResponse = httpclient.execute(httpGet); log.info(fileResponse.getStatusLine()); if(fileResponse.getStatusLine().toString().indexOf("200") == -1){ throw new IOException("File not found."); } HttpEntity fileEntity = fileResponse.getEntity(); InputStream fileIs = fileEntity.getContent(); Header[] hdrs = fileResponse.getHeaders("content-disposition"); String contentDisp = hdrs[0].toString(); String fileName = contentDisp.substring(contentDisp.indexOf("filename=")+9); File tempFile = File.createTempFile(fileName.substring(0, fileName.lastIndexOf(".")), fileName.substring(fileName.lastIndexOf(".")).toLowerCase()); OutputStream fileOs = new FileOutputStream(tempFile); IOUtils.copy(fileIs,fileOs); fileIs.close(); fileOs.close(); EntityUtils.consume(fileEntity); log.info("Download complete. Initiating word count generation"); File textFile = processFile(tempFile, receivedMsg.getId()); return textFile; } |
Code Block | ||||
---|---|---|---|---|
| ||||
private File processFile(File tempFile, String originalFileId) throws Exception {
Runtime r = Runtime.getRuntime();
Process p; // Process tracks one external native process
String tempDir = System.getProperty("java.io.tmpdir");
if (new Character(tempDir.charAt(tempDir.length()-1)).toString().equals(System.getProperty("file.separator")) == false){
tempDir = tempDir + System.getProperty("file.separator");
}
String processCmd = "";
String operSystem = System.getProperty("os.name").toLowerCase();
// TODO: windows impl
if(operSystem.indexOf("nix") >= 0 || operSystem.indexOf("nux") >= 0 || operSystem.indexOf("aix") > 0 ){
"wc -w " + tempDir + tempFile.getName();
}
p = r.exec(processCmd, null, new File(tempDir));
StreamGobbler outputGobbler = new StreamGobbler(p.getInputStream(), "INFO", log);
StreamGobbler errorGobbler = new StreamGobbler(p.getErrorStream(),"ERROR", log);
outputGobbler.start();
errorGobbler.start();
p.waitFor();
File outFile = new File(tempDir + tempFile.getName().substring(0, tempFile.getName().lastIndexOf(".")) + ".txt");
tempFile.delete();
if(!Files.exists(outFile.toPath()))
throw new Exception("File not processed correctly. File is possibly corrupt.");
return outFile;
} |
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
#include <amqpcpp.h> namespace CPPExample { class RabbitMQConnectionHandler : public AMQP::ConnectionHandler { /** * Method that is called by the AMQP library every time it has data * available that should be sent to RabbitMQ. * @param connection pointer to the main connection object * @param data memory buffer with the data that should be sent to RabbitMQ * @param size size of the buffer */ virtual void onData(AMQP::Connection *connection, const char *data, size_t size) { // @todo // Add your own implementation, for example by doing a call to the // send() system call. But be aware that the send() call may not // send all data at once, so you also need to take care of buffering // the bytes that could not immediately be sent, and try to send // them again when the socket becomes writable again } /** * Method that is called by the AMQP library when the login attempt * succeeded. After this method has been called, the connection is ready * to use. * @param connection The connection that can now be used */ virtual void onConnected(Connection *connection) { // @todo // add your own implementation, for example by creating a channel // instance, and start publishing or consuming } /** * Method that is called by the AMQP library when a fatal error occurs * on the connection, for example because data received from RabbitMQ * could not be recognized. * @param connection The connection on which the error occured * @param message A human readable error message */ virtual void onError(Connection *connection, const std::string &message) { // @todo // add your own implementation, for example by reporting the error // to the user of your program, log the error, and destruct the // connection object because it is no longer in a usable state } }; } |
...