Clojure integration with Java, notes

Tags

, ,

Goal: Define function in Clojure and call it from Java code

vanja-macbook-2:compile-example vanja$ ls -lh
drwxr-xr-x 2 vanja staff 68B Aug 11 16:24 classes
-rw-r-----@ 1 vanja staff 3.4M Aug 11 16:19 clojure-1.5.1.jar
-rw-r--r-- 1 vanja staff 34B Aug 11 16:20 test.clj

Classes is empty directory, will contain Clojure classes compiled, clojure-1.5.1.jar is Clojure JAR obtained from clojure.org, contents of test.clj:

(ns test)
(defn f1 [x] (+ x 1))

to compile Clojure to Java bytecode launch Clojure REPL

java -cp clojure-1.5.1.jar:. clojure.main

in REPL

(compile 'test)

this will generate byte code for test namespace, for now we have only f1 function definition inside

vanja-macbook-2:compile-example vanja$ ls -lh classes/
total 32
-rw-r--r-- 1 vanja staff 910B Aug 11 16:25 test$f1.class
-rw-r--r-- 1 vanja staff 1.3K Aug 11 16:25 test$fn__4.class
-rw-r--r-- 1 vanja staff 1.4K Aug 11 16:25 test$loading__4910__auto__.class
-rw-r--r-- 1 vanja staff 2.7K Aug 11 16:25 test__init.class

and Java class which will be used as entry point

vanja-macbook-2:compile-example vanja$ ls -lh
total 7024
-rw-r--r-- 1 vanja staff 202B Aug 11 16:32 HelloWorld.java
drwxr-xr-x 6 vanja staff 204B Aug 11 16:25 classes
-rw-r-----@ 1 vanja staff 3.4M Aug 11 16:19 clojure-1.5.1.jar
-rw-r--r-- 1 vanja staff 34B Aug 11 16:20 test.clj

contents of HelloWorld

public class HelloWorld {
	public static void main(String[] args) {
		System.out.println("running main");
		test$f1 function = new test$f1();
		System.out.println("Result: " + function.invoke(1));
	}
}

compile HelloWorld

javac -cp clojure-1.5.1.jar:classes/ HelloWorld.java

final file list

vanja-macbook-2:compile-example vanja$ ls -lh
total 7032
-rw-r--r-- 1 vanja staff 837B Aug 11 16:33 HelloWorld.class
-rw-r--r-- 1 vanja staff 202B Aug 11 16:32 HelloWorld.java
drwxr-xr-x 6 vanja staff 204B Aug 11 16:25 classes
-rw-r-----@ 1 vanja staff 3.4M Aug 11 16:19 clojure-1.5.1.jar
-rw-r--r-- 1 vanja staff 34B Aug 11 16:20 test.clj

running

vanja-macbook-2:compile-example vanja$ java -cp clojure-1.5.1.jar:.:classes/ HelloWorld
running main
Result: 2

Advertisements

Copying files between two Hadoop clusters running on different RPC versions inside single JVM

Tags

, , , , , , , , , , , ,

If you want to accomplish this easies way would be to use access source cluster over HFTP ( read only ) and write to destination cluster over HDFS. For better fine grain control over things you would require HDFS access to both clusters. I will try to describe one way to achieve that.

Idea: Get list of jars ( hadoop client and dependencies ) that are required for each of Hadoop versions. Load all jars required for particular Hadoop version + wrapper class ( implementation of interface that both clusters will support ) inside UrlClassLoader. Use specific class loader to get HadoopClient, interface to Hadoop cluster. Use JDK classes, InputStream, OutputStream, String to exchange data.

public interface HadoopClient {
    public void initialize(String path);

    public OutputStream streamForWriting(String path);

    public InputStream streamForReading(String path);

    public String[] list(String path);
}

To accomplish all this I exploit Maven. Project consists of modules:

hadoop-client-core – HadoopClient interface and HadoopClientLoader, brain of operation

hadoop-client-cdh* – implementations of HadoopClient for CDH3 and CDH5 Cloudera Hadoop versions and dependencies setup

console – Console class, showroom

hadoop-client-core doesn’t depend on any version of Hadoop, each of hadoop-client-cdh modules depend to appropriate version. console depends on hadoop-client-core. Dependency between console and hadoop-client-cdh modules is created over URLClassLoader.

    public static HadoopClient getHadoopClientCdh3u5() {
        ClassLoader classLoader = null;
        List urls = new LinkedList();
        try {
            urls.add(new URL(new URL("file:"), "./hadoop-client-cdh3/target/hadoop-client-cdh3-1.0-SNAPSHOT.jar"));

            for (String path: (new File("./hadoop-client-cdh3/lib/")).list()) {
                urls.add(new URL(new URL("file:"), "./hadoop-client-cdh3/lib/" + path));
            }

            classLoader = URLClassLoader.newInstance(urls.toArray(new URL[urls.size()]));
            //Thread.currentThread().setContextClassLoader(classLoader);
            return (HadoopClient)classLoader.loadClass("com.mungolab.playground.hadoop.HadoopClientImpl").newInstance();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

Since Configuration class needed for FileSystem initialization hardy uses ClassLoader I was required to set ClassLoader context for current Thread to get over problem loading DistributedFileSystem class for hdfs:// schema. I moved this form Loader to each of implementations:

    public void initialize(String path) {
        try {
            ClassLoader threadLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
            Configuration config = new Configuration();
            config.set("fs.hdfs.impl", (new org.apache.hadoop.hdfs.DistributedFileSystem()).getClass().getName());
            this.fs = FileSystem.get(new URI(path), config);
            Thread.currentThread().setContextClassLoader(threadLoader);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Prototype code is shared on my github, code given on github exposes read / write over stream and list.

Console class under console module gives idea how to initialize two clients, list directory and copy file between two clusters.

Lein project is added during debugging, but could be useful to test if everything is working as expected

(defn client (com.mungolab.playground.hadoop.HadoopClientLoader/getHadoopClientCdh3u5))

 

Multiple versions of same class in single JVM

Tags

, , , , , , , , ,

If you have two JARs with same class but on different version ( Hadoop clients for different versions of CDH distribution ) and want to use them in same JVM you will need to load each of them with dedicated classloader. Following post will give you idea how to do that. Working project is included on my github.

Maven project layout. To simulate situation with two JARs on different version I will create Maven project with five modules. Core will contain definition of Client interface which will be implemented in both JARs, mutual dependency will contain util class that both implementation will use ( I added this to test issue with assembly and shade plugins I was unable to overcome with Shade ). Console is wrapper module which depends on core and has Console class with main declared. Both simple-client-v0 and simple-client-v1 depend on core and implement Client interface with ClientImpl class. 

multiple-versions-classloading-1

In runtime situations look like on following image. Console is started from command line, core is loaded as part of system class loader. Two dedicated class loaders are created, each containing right version of simple client and copy of mutual dependency.

multiple-versions-classloading-2

Loading of single class loader:

ClassLoader classLoaderV0 = null;
URL urlV0 = new URL(new URL("file:"), "./sample-client-v0/target/sample-client-v0-1.0-SNAPSHOT.jar");
URL mutualDependency = new URL(new URL("file://"), "./mutural-dependency/target/mutual-dependency-1.0-SNAPSHOT.jar");
classLoaderV0 = URLClassLoader.newInstance(new URL[] { urlV0, mutualDependency });
Client clientV0 = null;
clientV0 = (Client) classLoaderV0.loadClass("com.mungolab.playground.impl.ClientImpl").newInstance();

Test call for client:

System.out.println("Client V0 version: " + clientV0.getVersion());

To run example use following commands

git clone https://github.com/vanjakom/multiple-versions-classloading.git .
cd multiple-versions-classloading
mvn clean package
java -cp console/target/console-1.0-SNAPSHOT.jar:core/target/core-1.0-SNAPSHOT.jar com.mungolab.playground.console.Console

Output should look like this:

ClassLoaders created
Class: class com.mungolab.playground.impl.ClientImpl
Class: class com.mungolab.playground.impl.ClientImpl
Class: class com.mungolab.playground.impl.ClientImpl
Class: class com.mungolab.playground.impl.ClientImpl
Clients created
Client V0 version: 0.0-VER1
Client V1 version: 0.1-VER1
Shade uber jars not working
ClassLoaders created

Notes on output. I tried both approaches here, creating class loader with separate jars required and using single uberjar created with assembly and shade. Output shows class name from all four class loaders ( two with separate jars and two with shade jars ). After clients are created version of each is presented. Shade JARs call is put in try/catch since it’s not working, will try to fix this soon.

Tracking application exceptions with logger

Tags

, , , , , ,

If you have service which is running in production and you want to be sure that everything is working as expected, and that service doesn’t have nice alerting system integrated you can use logging system instead and create simple alerting around it as first aid solution. My implementation is based on log4j backend. Idea is to create custom Appender which will get all logging events ( if it’s possible filtered to some level ) and report them to maintainer or support ( via email or some other way of communication ).

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
public class AlerterAppender extends AppenderSkeleton {
    @Override
    protected void append(LoggingEvent loggingEvent) {
        // level should be set in log4j properties or other backend configuration
        // but just in case we will check it once more
        if (loggingEvent.getLevel() == Level.ERROR) {
            StringBuilder sb = new StringBuilder();
            sb.append("Error: ").append(loggingEvent.getMessage());
            sb.append(" at thread: " + loggingEvent.getThreadName()).append("\n");
            String[] stack = loggingEvent.getThrowableStrRep();
            if (stack != null) {
                for (String call: stack) {
                    sb.append(call).append("\n");
                }
            }

            // TODO send email
        }
    }

    @Override
    public void close() {
        // empty
    }

    @Override
    public boolean requiresLayout() {
        return false;
    }
}

After you have your custom appender in place you only need to add it in log4j configuration:

log4j.rootLogger=INFO, ..., Alerter
...
log4j.appender.Alerter=com.mungolab.AppenderTest.AlerterAppender
log4j.appender.Alerter.threshold=ERROR

Outputting LZO compressed and indexed data from Map Reduce tasks

Tags

, , , , , , ,

Hadoop distribution currently doesn’t support outputting of LZO compressed data which will be indexed at same time. This feature is extremely useful if data you create with one Map Reduce task is used as input of another. With some changes of default TextOutputFormat I managed to accomplish this. Idea is to create specific LzoOutputFormat class which will be set as output format of task. Example is bellow

LzoIndexedOutputFormat class:

public class LzoIndexedOutputFormat<K, V> extends TextOutputFormat<K, V> {
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        // copied from TextOutputFormat class in Hadoop source code
        Configuration conf = job.getConfiguration();
        String keyValueSeparator =  conf.get("mapred.textoutputformat.separator", "\t");

        LzopCodec lzopCodec = new LzopCodec();
        lzopCodec.setConf(conf);

        Path filePath = getDefaultWorkFile(job, lzopCodec.getDefaultExtension());
        FileSystem fs = filePath.getFileSystem(conf);

        Path indexFilePath = new Path(filePath.toString() + ".index");

        FSDataOutputStream fileOut = fs.create(filePath, false);
        FSDataOutputStream fileIndexOut = fs.create(indexFilePath, false);

        OutputStream finalStream = lzopCodec.createIndexedOutputStream(fileOut, new DataOutputStream(fileIndexOut));

        return new LineRecordWriter<K, V>(new DataOutputStream(finalStream), keyValueSeparator);
}

To use this newly created OutputFormat use:

 
Job job = new Job(new Configuration(), "lzo job");
...
job.setOutputFormatClass(LzoIndexedOutputFormat.class);

job.waitForCompletion(true);

Helper class for NSNotificationCenter

Tags

, , , ,

Last weekend I tried to modify one of apps to use NSNotificationCenter instead of complicated custom code to notify more than one object about change in data. For this purpose I wrote simple helper class.

Header:

#import <Foundation/Foundation.h>
#import "NotificationDelegate.h"

@interface NotificationHelper : NSObject

+ (void)pushNotification:(NSString*)notification WithObject:(id)object;
+ (void)registerForNotification:(NSString*)notification WithDelegate:(id)delegate;
+ (void)unregisterForNotification:(id)delegate;

@end

Source:

#import "NotificationHelper.h"

@implementation NotificationHelper

+ (void)pushNotification:(NSString*)notification WithObject:(id)object
{
     [[NSNotificationCenter defaultCenter] postNotificationName:notification object:object];
}

+ (void)registerForNotification:(NSString*)notification WithDelegate:(id)delegate
{
     [[NSNotificationCenter defaultCenter] addObserver:delegate selector:@selector(reactOnNotification:) 
           name:notification object:nil];
}

+ (void)unregisterForNotification:(id)delegate
{
     [[NSNotificationCenter defaultCenter] removeObserver:delegate];
}

@end

and NotificationDelegate protocol:

#import <Foundation/Foundation.h>

@protocol NotificationDelegate

- (void)reactOnNotification:(NSNotification*)notification;

@end

Usage is pretty simple. All receivers should implement NotificationDelegate and register for notifications with call to NotificationHelper. Code which should be executed when message is recieved goes inside reactOnNotification:(NSNotification*)notification. Message sender should to call NotificationHelper’s pushNotification when they have message.

I pushed sample application on my github.

Enjoy.

First steps with Jade, Node template engine

Tags

, , , , , , ,

During experimenting with Jade I tried to create as simple as possible server / client code for serving of simple HTML file generated with Jade. Result are two examples that I will try to explain briefly, one using server side to generate HTML and second using client side to generate HTML.

Client side example uses Node as server of Jade template files which will be obtained by Ajax on client side, compiled, rendered and added to HTML. For server I used modified version of Simple static file HTTP server with Node.js. Only change is one extra if for files with jade extension which will look inside jade sub directory.

    } else if (filename.match(".jade$")) {
        contentType = "text/plain";
        pathToRead = "jade/" + filename;
    }

Magic happens on client side during load of html:

    $(document).ready(function() {
        $.ajax({url: "index.jade", success:function(data) {
            var fn = jade.compile(data);
            var html = fn({});
            document.write(html);
        }});
    });

Server side example uses Node to generate HTML which will be pushed back to client in rendered HTML form. Code for sever is different from one used in first example. Basically, we assume that each HTML file have corresponding Jade template, try to fetch that template, parse it, render and serve back to client as HTML.

var fs = require("fs");
var url = require("url");
var jade = require("jade");
var connect = require("connect");

connect.createServer(function(req, res){
    var request = url.parse(req.url, false);
    var filename = request.pathname.slice(1);

    if (request.pathname == '/') {
        filename = 'index.html';
    }

	console.log("Serving request: " + request.pathname + " => " + filename);

    var jadeFilename = "jade/" + filename.slice(0, filename.lastIndexOf(".")) + ".jade";

    console.log("Serving jade file: " + jadeFilename);

	try {
		fs.realpathSync(jadeFilename);
	} catch (e) {
		res.writeHead(404);
		res.end();
	}

	fs.readFile(jadeFilename, function(err, data) {
		if (err) {
            console.log(err);
			res.writeHead(500);
			res.end();
			return;
		}

        res.writeHead(200, {"Content-Type": "text/html"});

        var fn = jade.compile(data);
        var html = fn({});

		res.write(html);
		res.end();
	});
}).listen(8080);

You can get code for examples from my GitHub repo. To start any of these use node base/server.js inside example directory.

Drawing line to UIImage using CoreGraphics [ iOS ]

Tags

, , , , , ,

How to draw single line into UIImage using Core Graphics framework? Recently I needed this, so here is code ( everything is straightforward -> no explanation is needed 🙂 ):

    NSLog(@"Creating image");

    CGSize size = CGSizeMake(240.0f, 240.0f);
    UIGraphicsBeginImageContext(size);
    CGContextRef context = UIGraphicsGetCurrentContext();

    CGContextSetStrokeColorWithColor(context, [[UIColor blackColor] CGColor]);
    CGContextSetFillColorWithColor(context, [[UIColor whiteColor] CGColor]);

    CGContextFillRect(context, CGRectMake(0.0f, 0.0f, 240.0f, 240.0f));

    CGContextSetLineWidth(context, 5.0f);
    CGContextMoveToPoint(context, 100.0f, 100.0f);
    CGContextAddLineToPoint(context, 150.0f, 150.0f);
    CGContextStrokePath(context);

    UIImage* result = UIGraphicsGetImageFromCurrentImageContext();
    UIGraphicsEndImageContext();

    imageView.image = result;
    [imageView setNeedsDisplay];

    NSLog(@"Image creation finished");

Example can be found on my GitHub

Best

Hadoop: How to get active jobs in cluster

Tags

, , ,

Recently I was making Hadoop alerting infrastructure and I needed something to track active jobs in cluster.

So, for start you need instance of JobClient. JobClient is wrapper around JobTracker RPC, basically
under the hood JobClient creates JobSubmissionProtocol instance:

    public void init() throws IOException {
        String tracker = conf.get("mapred.job.tracker", "local");
        if ("local".equals(tracker)) {
          this.jobSubmitClient = new LocalJobRunner(conf);
        } else {
          this.jobSubmitClient = (JobSubmissionProtocol) 
            RPC.getProxy(JobSubmissionProtocol.class,
                         JobSubmissionProtocol.versionID,
                         JobTracker.getAddress(conf), conf);
        }        
    }

Let’s code:

import org.apache.hadoop.mapred.JobClient;

initialize JobClient instance:

JobClient jobClient = new JobClient(new InetSocketAddress(jobTrackerHost, jobTrackerPort), new Configuration());

where jobTrackerHost and jobTrackerPort are host name and port where Job Tracker is running …

To get list of currently active jobs in cluster all you have to do is:

JobStatus[] activeJobs = jobClient.jobsToComplete();

This will give you list of all active records, JobStatus is pretty useful. You can get jobId, username that was used to summit job, start time…

I created sample code which will every n seconds output number of active jobs and their info, look on my GitHub.

Hadoop “Could not complete file …” issue

Tags

, , , , ,

Recently I run into problem with Hadoop, causing DFSClient to stop responding and run into infinite while displaying “Could not complete file …” message. Hadoop version is 0.20.1, svn version 810220, but it seems to me from code that this issue can occur on newer version too.

NameNode logs are showing that file was created, blocks where assigned to it and there is no complete message in logs. In DataNode logs there is Exception which looks like network connectivity issue.

I found this issue on web HDFS-148 and it seems that I have same problem. My biggest problem is that I cannot replicate issue, it happens once in let’s say month.

After some digging in code I found part that causes me trouble:

    
    private void completeFile() throws IOException {
      long localstart = System.currentTimeMillis();
      boolean fileComplete = false;
      while (!fileComplete) {
        fileComplete = namenode.complete(src, clientName);
        if (!fileComplete) {
          if (!clientRunning ||
                (hdfsTimeout > 0 &&
                 localstart + hdfsTimeout < System.currentTimeMillis())) {
              String msg = "Unable to close file because dfsclient " +
                            " was unable to contact the HDFS servers." +
                            " clientRunning " + clientRunning +
                            " hdfsTimeout " + hdfsTimeout;
              LOG.info(msg);
              throw new IOException(msg);
          }
          try {
            Thread.sleep(400);
            if (System.currentTimeMillis() - localstart > 5000) {
              LOG.info("Could not complete file " + src + " retrying...");
            }
          } catch (InterruptedException ie) {
          }
        }
      }
    }

So, as I can conclude from logs, DFSClient entered this while loop and is constantly outputting:

LOG.info("Could not complete file " + src + " retrying...");

From some reason file is never completed ( name node doesn’t have complete call in logs, probably some network issue ), but completeFile should throw IOException when this is fulfilled:

if (!clientRunning || (hdfsTimeout > 0 && localstart + hdfsTimeout < System.currentTimeMillis()))

By default hdfsTimeout is set to -1 and client is running so this piece of code that throws exception is never executed. Code that sets hdfsTimeout in Client looks like:

  final public static int getTimeout(Configuration conf) {
    if (!conf.getBoolean("ipc.client.ping", true)) {
      return getPingInterval(conf);
    }
    return -1;
  }

I tried to look more about setting ping to false, found this HADOOP-6099. I will try to play with disabling ping but it’s hard because I can’t recreate issue.