Tags

, , , , , , , , , , , ,

If you want to accomplish this easies way would be to use access source cluster over HFTP ( read only ) and write to destination cluster over HDFS. For better fine grain control over things you would require HDFS access to both clusters. I will try to describe one way to achieve that.

Idea: Get list of jars ( hadoop client and dependencies ) that are required for each of Hadoop versions. Load all jars required for particular Hadoop version + wrapper class ( implementation of interface that both clusters will support ) inside UrlClassLoader. Use specific class loader to get HadoopClient, interface to Hadoop cluster. Use JDK classes, InputStream, OutputStream, String to exchange data.

public interface HadoopClient {
    public void initialize(String path);

    public OutputStream streamForWriting(String path);

    public InputStream streamForReading(String path);

    public String[] list(String path);
}

To accomplish all this I exploit Maven. Project consists of modules:

hadoop-client-core – HadoopClient interface and HadoopClientLoader, brain of operation

hadoop-client-cdh* – implementations of HadoopClient for CDH3 and CDH5 Cloudera Hadoop versions and dependencies setup

console – Console class, showroom

hadoop-client-core doesn’t depend on any version of Hadoop, each of hadoop-client-cdh modules depend to appropriate version. console depends on hadoop-client-core. Dependency between console and hadoop-client-cdh modules is created over URLClassLoader.

    public static HadoopClient getHadoopClientCdh3u5() {
        ClassLoader classLoader = null;
        List urls = new LinkedList();
        try {
            urls.add(new URL(new URL("file:"), "./hadoop-client-cdh3/target/hadoop-client-cdh3-1.0-SNAPSHOT.jar"));

            for (String path: (new File("./hadoop-client-cdh3/lib/")).list()) {
                urls.add(new URL(new URL("file:"), "./hadoop-client-cdh3/lib/" + path));
            }

            classLoader = URLClassLoader.newInstance(urls.toArray(new URL[urls.size()]));
            //Thread.currentThread().setContextClassLoader(classLoader);
            return (HadoopClient)classLoader.loadClass("com.mungolab.playground.hadoop.HadoopClientImpl").newInstance();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

Since Configuration class needed for FileSystem initialization hardy uses ClassLoader I was required to set ClassLoader context for current Thread to get over problem loading DistributedFileSystem class for hdfs:// schema. I moved this form Loader to each of implementations:

    public void initialize(String path) {
        try {
            ClassLoader threadLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
            Configuration config = new Configuration();
            config.set("fs.hdfs.impl", (new org.apache.hadoop.hdfs.DistributedFileSystem()).getClass().getName());
            this.fs = FileSystem.get(new URI(path), config);
            Thread.currentThread().setContextClassLoader(threadLoader);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Prototype code is shared on my github, code given on github exposes read / write over stream and list.

Console class under console module gives idea how to initialize two clients, list directory and copy file between two clusters.

Lein project is added during debugging, but could be useful to test if everything is working as expected

(defn client (com.mungolab.playground.hadoop.HadoopClientLoader/getHadoopClientCdh3u5))

 

Advertisements