...
Code Block | ||||
---|---|---|---|---|
| ||||
private File processFile(File tempFile, String originalFileId) throws Exception { Runtime r = Runtime.getRuntime(); Process p; // Process tracks one external native process String tempDir = System.getProperty("java.io.tmpdir"); if (new Character(tempDir.charAt(tempDir.length()-1)).toString().equals(System.getProperty("file.separator")) == false){ tempDir = tempDir + System.getProperty("file.separator"); } String processCmd = ""; String operSystem = System.getProperty("os.name").toLowerCase(); // TODO: windows impl if(operSystem.indexOf("nix") >= 0 || operSystem.indexOf("nux") >= 0 || operSystem.indexOf("aix") > 0 ){ "wc -w " + tempDir + tempFile.getName(); } p = r.exec(processCmd, null, new File(tempDir)); StreamGobbler outputGobbler = new StreamGobbler(p.getInputStream(), "INFO", log); StreamGobbler errorGobbler = new StreamGobbler(p.getErrorStream(),"ERROR", log); outputGobbler.start(); errorGobbler.start(); p.waitFor(); File outFile = new File(tempDir + tempFile.getName().substring(0, tempFile.getName().lastIndexOf(".")) + ".txt"); tempFile.delete(); if(!Files.exists(outFile.toPath())) throw new Exception("File not processed correctly. File is possibly corrupt."); return outFile; } |
Anchor | ||||
---|---|---|---|---|
|
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
def main():
global logger
# name of receiver
receiver='ExamplePythonExtractor'
# configure the logging system
logging.basicConfig(format="%(asctime)-15s %(name)-10s %(levelname)-7s : %(message)s", level=logging.WARN)
logger = logging.getLogger(receiver)
logger.setLevel(logging.DEBUG)
if len(sys.argv) != 4:
logger.info("Input RabbitMQ username, followed by RabbitMQ password and Medici REST API key.")
sys.exit()
global playserverKey
playserverKey = sys.argv[3]
global exchange_name
exchange_name = sys.argv[4]
# connect to rabbitmq using input username and password
credentials = pika.PlainCredentials(sys.argv[1], sys.argv[2])
parameters = pika.ConnectionParameters(credentials=credentials)
connection = pika.BlockingConnection(parameters)
# connect to channel
channel = connection.channel()
# declare the exchange
channel.exchange_declare(exchange='medici', exchange_type='topic', durable=True)
# declare the queue
channel.queue_declare(queue=receiver, durable=True)
# connect queue and exchange
channel.queue_bind(queue=receiver, exchange='medici', routing_key='*.file.text.plain')
# create listener
channel.basic_consume(on_message, queue=receiver, no_ack=False)
# start listening
logger.info("Waiting for messages. To exit press CTRL+C")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
# close connection
connection.close() |
Code Block | ||||
---|---|---|---|---|
| ||||
def on_message(channel, method, header, body):
global logger
statusreport = {}
inputfile=None
try:
# parse body back from json
jbody=json.loads(body)
host=jbody['host']
fileid=jbody['id']
intermediatefileid=jbody['intermediateId']
if not (host.endswith('/')):
host += '/'
# for status reports
statusreport['file_id'] = fileid
statusreport['extractor_id'] = 'wordCount'
# print what we are doing
logger.debug("[%s] started processing", fileid)
# fetch data
statusreport['status'] = 'Downloading file.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
url=host + 'api/files/' + intermediatefileid + '?key=' + playserverKey
r=requests.get(url, stream=True)
r.raise_for_status()
(fd, inputfile)=tempfile.mkstemp()
with os.fdopen(fd, "w") as f:
for chunk in r.iter_content(chunk_size=10*1024):
f.write(chunk)
# create word count
statusreport['status'] = 'Creating word count.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
create_word_count(inputfile, ext, host, fileid)
# Ack
channel.basic_ack(method.delivery_tag)
logger.debug("[%s] finished processing", fileid)
except subprocess.CalledProcessError as e:
logger.exception("[%s] error processing [exit code=%d]\n%s", fileid, e.returncode, e.output)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
except:
logger.exception("[%s] error processing", fileid)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
finally:
statusreport['status'] = 'DONE.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
if inputfile is not None:
try:
os.remove(inputfile)
except OSError:
pass
except UnboundLocalError:
pass |
...
language | py |
---|---|
title | Example Job: Word Count |
...
A complete example of the python extractor can be found at Medici Extractor in Python
Calling R Scripts from Python
...