def on_message(channel, method, header, body):
global logger
statusreport = {}
inputfile=None
try:
# parse body back from json
jbody=json.loads(body)
host=jbody['host']
fileid=jbody['id']
intermediatefileid=jbody['intermediateId']
if not (host.endswith('/')):
host += '/'
# for status reports
statusreport['file_id'] = fileid
statusreport['extractor_id'] = 'wordCount'
# print what we are doing
logger.debug("[%s] started processing", fileid)
# fetch data
statusreport['status'] = 'Downloading file.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
url=host + 'api/files/' + intermediatefileid + '?key=' + playserverKey
r=requests.get(url, stream=True)
r.raise_for_status()
(fd, inputfile)=tempfile.mkstemp()
with os.fdopen(fd, "w") as f:
for chunk in r.iter_content(chunk_size=10*1024):
f.write(chunk)
# create thumbnail
statusreport['status'] = 'Creating word count.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
create_word_coundcount(inputfile, ext, host, fileid)
# Ack
channel.basic_ack(method.delivery_tag)
logger.debug("[%s] finished processing", fileid)
except subprocess.CalledProcessError as e:
logger.exception("[%s] error processing [exit code=%d]\n%s", fileid, e.returncode, e.output)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
except:
logger.exception("[%s] error processing", fileid)
statusreport['status'] = 'Error processing.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
finally:
statusreport['status'] = 'DONE.'
statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
channel.basic_publish(exchange='',
routing_key=header.reply_to,
properties=pika.BasicProperties(correlation_id = \
header.correlation_id),
body=json.dumps(statusreport))
if inputfile is not None:
try:
os.remove(inputfile)
except OSError:
pass
except UnboundLocalError:
pass |