Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagepy
titleOn Message
def on_message(channel, method, header, body):
	global logger
	statusreport = {}
	inputfile=None
	try:
		# parse body back from json
		jbody=json.loads(body)

		host=jbody['host']
		fileid=jbody['id']
		intermediatefileid=jbody['intermediateId']
		if not (host.endswith('/')):
			host += '/'
		
		# for status reports
		statusreport['file_id'] = fileid
		statusreport['extractor_id'] = 'wordCount' 

		# print what we are doing
		logger.debug("[%s] started processing", fileid)

		# fetch data
		statusreport['status'] = 'Downloading file.'
		statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
		channel.basic_publish(exchange='',
							routing_key=header.reply_to,
							properties=pika.BasicProperties(correlation_id = \
														header.correlation_id),
							body=json.dumps(statusreport)) 
		url=host + 'api/files/' + intermediatefileid + '?key=' + playserverKey
		r=requests.get(url, stream=True)
		r.raise_for_status()
		(fd, inputfile)=tempfile.mkstemp()
		with os.fdopen(fd, "w") as f:
			for chunk in r.iter_content(chunk_size=10*1024):
				f.write(chunk)


		# create word count
		statusreport['status'] = 'Creating word count.'
		statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
		channel.basic_publish(exchange='',
							routing_key=header.reply_to,
							properties=pika.BasicProperties(correlation_id = \
														header.correlation_id),
							body=json.dumps(statusreport))
		create_word_count(inputfile, ext, host, fileid)

		# Ack
		channel.basic_ack(method.delivery_tag)
		logger.debug("[%s] finished processing", fileid)
	except subprocess.CalledProcessError as e:
		logger.exception("[%s] error processing [exit code=%d]\n%s", fileid, e.returncode, e.output)
		statusreport['status'] = 'Error processing.'
		statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') 
		channel.basic_publish(exchange='',
                routing_key=header.reply_to,
                properties=pika.BasicProperties(correlation_id = \
                                                header.correlation_id),
                body=json.dumps(statusreport)) 
	except:
		logger.exception("[%s] error processing", fileid)
		statusreport['status'] = 'Error processing.'
		statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S') 
		channel.basic_publish(exchange='',
                routing_key=header.reply_to,
                properties=pika.BasicProperties(correlation_id = \
                                                header.correlation_id),
                body=json.dumps(statusreport))		
	finally:
		statusreport['status'] = 'DONE.'
		statusreport['start'] = time.strftime('%Y-%m-%dT%H:%M:%S')
		channel.basic_publish(exchange='',
							routing_key=header.reply_to,
							properties=pika.BasicProperties(correlation_id = \
														header.correlation_id),
							body=json.dumps(statusreport))
		if inputfile is not None:
			try:
				os.remove(inputfile)
			except OSError:
				pass
			except UnboundLocalError:
				pass

 

Calling R Scripts from Python

 

Code Block
languagepy
titleCreate Word Count
def create_word_count(inputfile, ext, host, fileid):
	global logger

	(fd, inputfile)=tempfile.mkstemp(suffix='.' + ext)
	try:
		# make syscall to wc
		subprocess.check_output(['wc', inputfile], stderr=subprocess.STDOUT)

		if(os.path.getsize(wcfile) == 0):
			raise Exception("File is empty.")

		# upload word count file

...