...
Code Block | ||||
---|---|---|---|---|
| ||||
private File processFile(File tempFile, String originalFileId) throws Exception { Runtime r = Runtime.getRuntime(); Process p; // Process tracks one external native process String tempDir = System.getProperty("java.io.tmpdir"); if (new Character(tempDir.charAt(tempDir.length()-1)).toString().equals(System.getProperty("file.separator")) == false){ tempDir = tempDir + System.getProperty("file.separator"); } String processCmd = ""; String operSystem = System.getProperty("os.name").toLowerCase(); // TODO: windows impl if(operSystem.indexOf("nix") >= 0 || operSystem.indexOf("nux") >= 0 || operSystem.indexOf("aix") > 0 ){ "wc -w " + tempDir + tempFile.getName(); } p = r.exec(processCmd, null, new File(tempDir)); StreamGobbler outputGobbler = new StreamGobbler(p.getInputStream(), "INFO", log); StreamGobbler errorGobbler = new StreamGobbler(p.getErrorStream(),"ERROR", log); outputGobbler.start(); errorGobbler.start(); p.waitFor(); File outFile = new File(tempDir + tempFile.getName().substring(0, tempFile.getName().lastIndexOf(".")) + ".txt"); tempFile.delete(); if(!Files.exists(outFile.toPath())) throw new Exception("File not processed correctly. File is possibly corrupt."); return outFile; } |
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||
---|---|---|---|---|
| ||||
#include <amqpcpp.h> namespace CPPExample { class RabbitMQConnectionHandler : public AMQP::ConnectionHandler { /** * Method that is called by the AMQP library every time it has data * available that should be sent to RabbitMQ. * @param connection pointer to the main connection object * @param data memory buffer with the data that should be sent to RabbitMQ * @param size size of the buffer */ virtual void onData(AMQP::Connection *connection, const char *data, size_t size) { // @todo // Add your own implementation, for example by doing a call to the // send() system call. But be aware that the send() call may not // send all data at once, so you also need to take care of buffering // the bytes that could not immediately be sent, and try to send // them again when the socket becomes writable again } /** * Method that is called by the AMQP library when the login attempt * succeeded. After this method has been called, the connection is ready * to use. * @param connection The connection that can now be used */ virtual void onConnected(Connection *connection) { // @todo // add your own implementation, for example by creating a channel // instance, and start publishing or consuming } /** * Method that is called by the AMQP library when a fatal error occurs * on the connection, for example because data received from RabbitMQ * could not be recognized. * @param connection The connection on which the error occured * @param message A human readable error message */ virtual void onError(Connection *connection, const std::string &message) { // @todo // add your own implementation, for example by reporting the error // to the user of your program, log the error, and destruct the // connection object because it is no longer in a usable state } }; } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
def on_message(channel, method, header, body): global logger statusreport = {} inputfile=None try: # parse body back from json jbody=json.loads(body) host=jbody['host'] fileid=jbody['id'] intermediatefileid=jbody['intermediateId'] if not (host.endswith('/')): host += '/' # for status reports statusreport['file_id'] = fileid statusreport['extractor_id'] = 'wordCount' # print what we are doing logger.debug("[%s] started processing", fileid) # fetch data statusreport['status'] = 'Downloading file.' statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') channel.basic_publish(exchange='', routing_key=header.reply_to, properties=pika.BasicProperties(correlation_id = \ header.correlation_id), body=json.dumps(statusreport)) url=host + 'api/files/' + intermediatefileid + '?key=' + playserverKey r=requests.get(url, stream=True) r.raise_for_status() (fd, inputfile)=tempfile.mkstemp() with os.fdopen(fd, "w") as f: for chunk in r.iter_content(chunk_size=10*1024): f.write(chunk) # create word count statusreport['status'] = 'Creating word count.' statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') channel.basic_publish(exchange='', routing_key=header.reply_to, properties=pika.BasicProperties(correlation_id = \ header.correlation_id), body=json.dumps(statusreport)) create_word_count(inputfile, ext, host, fileid) # Ack channel.basic_ack(method.delivery_tag) logger.debug("[%s] finished processing", fileid) except subprocess.CalledProcessError as e: logger.exception("[%s] error processing [exit code=%d]\n%s", fileid, e.returncode, e.output) statusreport['status'] = 'Error processing.' statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') channel.basic_publish(exchange='', routing_key=header.reply_to, properties=pika.BasicProperties(correlation_id = \ header.correlation_id), body=json.dumps(statusreport)) except: logger.exception("[%s] error processing", fileid) statusreport['status'] = 'Error processing.' statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') channel.basic_publish(exchange='', routing_key=header.reply_to, properties=pika.BasicProperties(correlation_id = \ header.correlation_id), body=json.dumps(statusreport)) finally: statusreport['status'] = 'DONE.' statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') channel.basic_publish(exchange='', routing_key=header.reply_to, properties=pika.BasicProperties(correlation_id = \ header.correlation_id), body=json.dumps(statusreport)) if inputfile is not None: try: os.remove(inputfile) except OSError: pass except UnboundLocalError: pass |
Calling R Scripts from Python
Code Block | ||||
---|---|---|---|---|
| ||||
def create_word_count(inputfile, ext, host, fileid): global logger (fd, inputfile)=tempfile.mkstemp(suffix='.' + ext) try: # make syscall to wc subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT) if(os.path.getsize(wcfile) == 0): raise Exception("File is empty.") # upload word count file |
Calling R Scripts from Python
AnchorVersus Versus
Versus Extractors
Versus | |
Versus |
...