...
Code Block |
---|
|
public static void main(String[] argv) throws Exception {
// setup connection parameters
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rabbitmqhost);
if ((username != null) && (password != null)) {
factory.setUsername(username);
factory.setPassword(password);
}
// connect to rabitmq
Connection connection = factory.newConnection();
// connect to channel
final Channel channel = connection.createChannel();
// declare the exchange
channel.exchangeDeclare(exchange, "topic", true);
// declare the queue
channel.queueDeclare(extractorName, true, false, false, null);
// connect queue and exchange
channel.queueBind(extractorName, exchange, messageType);
// create listener
channel.basicConsume(extractorName, false, "", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties header, byte[] body) throws IOException {
WordCount wc = new WordCount();
wc.onMessage(channel, envelope.getDeliveryTag(), header, new String(body));
}
});
// start listening
logger.info("[*] Waiting for messages. To exit press CTRL+C");
while (true) {
Thread.sleep(1000);
}
} |
Next the function to handle a message. This will call a function to download the data, and process the data. Only if download and process_file are handled without any errors, this function will ack the message on the message bus, telling rabbitMQ the file has been processed, and nobody else should process the file again. If this ack did not happen (because the process died in the middle), the next extractor listening on this channel will pick up the message and process the file.
Code Block |
---|
language | java |
---|
title | onMessage |
---|
|
public void onMessage(Channel channel, long tag, AMQP.BasicProperties header, String body) {
File inputfile = null;
String fileid = "";
try {
@SuppressWarnings("unchecked")
Map<String, Object> jbody = mapper.readValue(body, Map.class);
String host = jbody.get("host").toString();
fileid = jbody.get("id").toString();
String intermediatefileid = jbody.get("intermediateId").toString();
if (!host.endsWith("/")) {
host += "/";
}
statusUpdate(channel, header, fileid, "Started processing file");
// download the file
inputfile = downloadFile(channel, header, host, secretKey, fileid, intermediatefileid);
// process file
processFile(channel, header, host, secretKey, fileid, intermediatefileid, inputfile);
// send ack that we are done
channel.basicAck(tag, false);
} catch (Throwable thr) {
logger.error("Error processing file", thr);
try {
statusUpdate(channel, header, fileid, "Error processing file : " + thr.getMessage());
} catch (IOException e) {
logger.warn("Could not send status update.", e);
}
} finally {
try {
statusUpdate(channel, header, fileid, "Done");
} catch (IOException e) {
logger.warn("Could not send status update.", e);
}
if (inputfile != null) {
inputfile.delete();
}
}
} |
This function will download the actual file from medici so it can be processed by the local extractor. The on_message function will take care of removing the file after the processing of data is complete.
Code Block |
---|
language | java |
---|
title | downloadFile |
---|
|
private File downloadFile(Channel channel, AMQP.BasicProperties header, String host, String key, String fileid, String intermediatefileid) throws IOException {
statusUpdate(channel, header, fileid, "Downloading file");
URL source = new URL(host + "api/files/" + intermediatefileid + "?key=" + key);
File outputfile = File.createTempFile("medici", ".tmp");
outputfile.deleteOnExit();
FileUtils.copyURLToFile(source, outputfile);
return outputfile;
}
|
Finally this is the main block of code. This will process the data, and send any extracted metadata, or previews back to medici. The following example will send back extracted metadata back to medici.
Code Block |
---|
language | java |
---|
title | processFile |
---|
|
private void processFile(Channel channel, AMQP.BasicProperties header, String host, String key, String fileid, String intermediatefileid, File inputfile) throws IOException {
statusUpdate(channel, header, fileid, "Counting words in file.");
// implementation of word count (unix wc utility)
int lines = 0, words = 0, characters = 0;
BufferedReader br = new BufferedReader(new FileReader(inputfile));
String line;
while ((line = br.readLine()) != null) {
lines++;
String[] pieces = line.split("\\s+");
words += pieces.length;
for (String s : pieces) {
characters += s.length();
}
}
br.close();
// store results as metadata
Map<String, Object> metadata = new HashMap<String, Object>();
metadata.put("lines", lines);
metadata.put("words", words);
metadata.put("characters", characters);
postMetaData(host, key, fileid, metadata);
}
|
Finally there is the postMetaData function which will post metadata back to medici at a specific URL
Code Block |
---|
language | java |
---|
title | postMetaData |
---|
|
private String postMetaData(String host, String key, String fileid, Map<String, Object> metadata) throws IOException {
URL url = new URL(host + "api/files/" + fileid + "/metadata?key=" + key);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
conn.setRequestProperty("Content-Type", "application/json");
conn.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(conn.getOutputStream());
mapper.writeValue(wr, metadata);
wr.flush();
wr.close();
int responseCode = conn.getResponseCode();
if (responseCode != 200) {
throw (new IOException("Error uploading metadata [code=" + responseCode + "]"));
}
BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
String inputLine;
StringBuffer response = new StringBuffer();
while ((inputLine = in.readLine()) != null) {
response.append(inputLine);
}
in.close();
return response.toString();
} |