multiple-reduce, clojure macro

Tags

, , , , ,

(defmacro multiple-reduce
  [bindings coll]
  `(reduce
     (fn [state# value#]
       (apply
         array-map
         (apply
           concat
           (map
             (fn [[key# default# func#]]
               [key# (func# (get state# key# default#) value#)])
             (partition
               3
               ~bindings)))))
     {}
     ~coll))

Macro for doing multiple parallel aggregations on objects sequence resulting in single aggregate object.

multiple-reduce is defined under clj-common project

Example:

(multiple-reduce
  [
    :inc 0 #(+ %1 (inc %2))
    :dec 0 #(+ %1 (dec %2))]
  '(1 2 3))

will result with

{:inc 9, :dec 3}

Each binding is defined with  keyword that will contain aggregated value after aggregation is finished, initial value and aggregate function ( f(state, value) -> state ). Aggregate function is function of two args, aggregated value ( state ) and increment value ( value ) which with each iteration returns new state.

Example usage from my project

(let [first-location (first locations-seq)
      aggregate (multiple-reduce
                  [
                    :max-longitude (:longitude first-location) (partial max-aggregate :longitude)
                    :min-longitude (:longitude first-location) (partial min-aggregate :latitude)
                    :max-latitude (:latitude first-location) (partial max-aggregate :latitude)
                    :min-latitude (:latitude first-location) (partial min-aggregate :latitude)
                    :distance [0 first-location] distance-aggregate
                    :last-location first-location last-location-aggregate
                    :activity-duration [0 first-location] duration-without-pause]
                    locations-seq)]
  
  ;continue computation on aggregate

  )

Where distance-aggregate is defined as

(defn distance-aggregate [[distance previous-location] location]
  [(+ distance (geo/distance-two-locations previous-location location)) location])

CloudKit as exchange between iOS app and JVM backend service

cloudkit-exchange

With CloudKit Web Services Apple gave access to CloudKit to non Apple devices / applications / services. Access to public CloudKit db as admin user ( read / write permissions ) is available with generated token and does not require any user login.

CloudKit could be used as passive backend that iOS apps interact with. Maintained by backend JVM service that process requests and updates data in streaming or batch manner.

Interesting to me was to develop simple exchange which will give iOS application possibility to write data which needs to be processed by backend service offline and as result of processing other CloudKit “table” should be updated.

First thing was to declare new RecordType, here called Exchange with following fields

type: String
timestamp: Date/Time
marked: Int(64)
data: Asset

type is intended for separation of different kinds of exchange messages, timestamp represents time when message was put in queue, marked should be used to mark processed messages, which later can be removed, data is Asset which will contain actual message. Message could be anything, in my case JSON.

Initialization of CloudKit exchange from iOS application

let container = CKContainer(identifier: containerName)
self.database = container.publicCloudDatabase
self.exchangeRecordType = exchangeRecordType // "Exchange"

Queue message to exchange, maybe queue is to strong word here

func put(
    _ type: String,
    dataUrl: URL,
    _ handler: @escaping (_ status: CloudKitExchangeRequestStatus) -> Void) -> Void {
    
    let exchangeMessage = CKRecord(recordType: self.exchangeRecordType)
    exchangeMessage.setObject(Date() as CKRecordValue?, forKey: "timestamp")
    exchangeMessage.setObject(0 as CKRecordValue?, forKey: "marked")
    exchangeMessage.setObject(type as CKRecordValue?, forKey: "type")
    
    let dataAsset = CKAsset(fileURL: dataUrl)
    exchangeMessage.setObject(dataAsset, forKey: "data")
    
    self.database.save(exchangeMessage) { (record, error) in
        if record != nil && error == nil {
            DispatchQueue.main.async {
                handler(.fail)
            }
        } else {
            DispatchQueue.main.async {
                handler(.fail)
            }
        }
    }
}

With CloudKitExchange in place posting of message is easy

cloudKitExchange.put("locations", dataUrl: FileUtils.getFullPathUrlInDocuments("locations.json")) { (status) in
    if status == .ok {
        self.statusLabel.text = "done"
    } else {
        self.statusLabel.text = "fail"
    }
}

Full code for CloudKitExchange can be found on github

To retrieve message from JVM backend app I was using simple CloudKit client, developed for Clojure ( clj-cloudkit  )

(defn pop [client type]
  (let [records (cloudkit/records-query
                  client
                  *exchange-record-type*
                  (list
                    (cloudkit-filter/equals :marked 0)
                    (cloudkit-filter/equals :type type))
                  (list
                    (cloudkit-sort/descending :timestamp)))]
    (if-let [record (first records)]
      (let [data-url (:downloadURL (:data record))]
        (if-let [data (cloudkit/assets-download
                        client
                        data-url)]
          (do
            (cloudkit/records-modify
              client
              (list
                (cloudkit-operation/update
                  (assoc (select-keys record [:marked]) :marked 1)
                  *exchange-record-type*)))
            data))))))

Full code for clj-client end exchange can be found under github

boot-clj for building simple script

# install

sudo bash -c "cd /usr/local/bin && curl -fsSLo boot https://github.com/boot-clj/boot-bin/releases/download/latest/boot.sh && chmod 755 boot"

# simple script ( simple.sh )

#!/usr/bin/env boot
(set-env! :dependencies '[[org.apache.commons/commons-lang3 "3.5"]])
(println (org.apache.commons.lang3.StringUtils/join ["a" "b" "c"] ","))

# run

./simple.sh

Apple CloudKit Server to Server request in JVM environment

Tags

, , , , , , , ,

Three things are needed for this code to work, private key ( as hex ), keyid ( one given in CloudKit Dashboard and container id. Code demonstrates call to /users/current route which returns assigned userid.

To get private key in required format from one created following Apple documentation use following command:

 openssl ec -in key.pem -noout -text

Concatenate private key by removing colons.

Container id should start with iCloud.

Path for me to make this work was, first ensure that node.js code is working, play with JVM setup, I used super useful ECDSA Online checker to first ensure signing is working…

(user-request
	"00b51d4..."
	"b6367f3..."
	"iCloud....")

 

Clojure code:

(require '[clj-http.client :as http])
(require '[clojure.data.json :as json])

(defn create-signature-fn [private-key-hex]
  (let [keypair-generator (java.security.KeyPairGenerator/getInstance "EC")
        prime256v1-gen-param-spec (new java.security.spec.ECGenParameterSpec "secp256r1")]
    (.initialize keypair-generator prime256v1-gen-param-spec)
    (let [ec-params (.getParams (.getPrivate (.generateKeyPair keypair-generator)))
          private-key-bigint (new java.math.BigInteger private-key-hex 16)
          private-key-specs (new java.security.spec.ECPrivateKeySpec private-key-bigint ec-params)
          key-factory (java.security.KeyFactory/getInstance "EC")
          private-key (.generatePrivate key-factory private-key-specs)
          signature (java.security.Signature/getInstance "SHA256withECDSA")
          base64-encoder (java.util.Base64/getEncoder)]
      (.initSign signature private-key)
      (fn [body]
        (.update signature (.getBytes body))
        (.encodeToString base64-encoder (.sign signature))))))

(defn user-request [private-key-hex key-id cloudkit-container]
  (let [simple-date-formatter (new java.text.SimpleDateFormat "yyyy-MM-dd'T'HH:mm:ss'Z'")
        timezone (java.util.TimeZone/getTimeZone "UTC")
        base64-encoder (java.util.Base64/getEncoder)
        sha256-digest (java.security.MessageDigest/getInstance "SHA-256")
        signature-fn (create-signature-fn private-key-hex)]
    (.setTimeZone simple-date-formatter timezone)
    (let [request-date-iso (.format simple-date-formatter (new java.util.Date))
          subpath (str "/database/1/" cloudkit-container "/development/public/users/current")
          body ""
          body-sha256 (.digest sha256-digest (.getBytes body))
          body-sha256-base64 (.encodeToString base64-encoder body-sha256)
          sign-request (str request-date-iso ":" body-sha256-base64 ":" subpath)
          signature (signature-fn sign-request)
          headers {
                    "X-Apple-CloudKit-Request-KeyID" key-id
                    "X-Apple-CloudKit-Request-ISO8601Date" request-date-iso
                    "X-Apple-CloudKit-Request-SignatureV1" signature}]
      (let [response (http/get
                       (str "https://api.apple-cloudkit.com" subpath)
                       {
                         :headers headers
                         :insecure? true})]
        (json/read-str (:body response))))))

Java code:

import com.fasterxml.jackson.databind.ObjectMapper;

import java.math.BigInteger;
import java.net.HttpURLConnection;
import java.net.URL;
import java.security.*;
import java.security.interfaces.ECKey;
import java.security.spec.AlgorithmParameterSpec;
import java.security.spec.ECGenParameterSpec;
import java.security.spec.ECParameterSpec;
import java.security.spec.ECPrivateKeySpec;
import java.text.SimpleDateFormat;
import java.util.*;

public class Sample {
    public static void main(String[] args) throws Exception {
        String privateKeyHex = args[0];
        String keyId = args[1];
        String cloudKitContainer = args[2];

        KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("EC");
        AlgorithmParameterSpec prime256v1ParamSpec = new ECGenParameterSpec("secp256r1");

        keyPairGenerator.initialize(prime256v1ParamSpec);

        ECParameterSpec parameterSpec = ((ECKey)keyPairGenerator.generateKeyPair().getPrivate()).getParams();

        BigInteger privateKeyInt = new BigInteger(privateKeyHex, 16);

        ECPrivateKeySpec privateKeySpec = new ECPrivateKeySpec(privateKeyInt, parameterSpec);
        KeyFactory keyFactory = KeyFactory.getInstance("EC");
        PrivateKey privateKey = keyFactory.generatePrivate(privateKeySpec);
        Signature signature = Signature.getInstance("SHA256withECDSA");

        signature.initSign(privateKey);

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));

        Base64.Encoder encoder = Base64.getEncoder();

        MessageDigest sha256Digest = MessageDigest.getInstance("SHA-256");

        String requestDateIso = simpleDateFormat.format(new Date());
        String subPath = "/database/1/" + cloudKitContainer +  "/development/public/users/current";
        String body = "";
        byte[] bodySha256 = sha256Digest.digest(body.getBytes());
        String bodySha256Base64 = encoder.encodeToString(bodySha256);

        String signRequest = requestDateIso + ":" + bodySha256Base64 + ":" + subPath;
        signature.update(signRequest.getBytes());
        String signedRequest = encoder.encodeToString(signature.sign());

        URL url = new URL("https://api.apple-cloudkit.com" + subPath);

        HttpURLConnection connection = (HttpURLConnection)url.openConnection();

        connection.setRequestMethod("GET");

        connection.setRequestProperty("X-Apple-CloudKit-Request-KeyID", keyId);
        connection.setRequestProperty("X-Apple-CloudKit-Request-ISO8601Date", requestDateIso);
        connection.setRequestProperty("X-Apple-CloudKit-Request-SignatureV1", signedRequest);

        ObjectMapper objectMapper = new ObjectMapper();
        Map<String, Object> response = objectMapper.readValue(connection.getInputStream(),
                objectMapper.getTypeFactory().constructMapType(
                    HashMap.class,
                    String.class,
                    Object.class));

        System.out.println(response.get("userRecordName"));
    }
}

Notes:

time should be UTC formatted with fixed Z instead of zone offset

Working node.js code for reference:

(function() {
	console.log("starting");

	var fs = require('fs');
	var crypto = require('crypto');

	var message = "test";

	var key = fs.readFileSync("key.pem", "utf8");

	var signature = crypto.createSign("sha256");
	signature.update(message);

	var signedMessage = signature.sign(key, "base64");

	console.log(signedMessage);
})();

 

Links:

[apple] Composing Web Service Requests

[stackoverflow] CloudKit Server-to-Server authentication

ECDSA Online checker

ANDREA CORBELLINI – Elliptic Curve Cryptography: a gentle introduction

[nodejs] Clout Kit Web Services using server to server token

Share common code between Clojure projects

Tags

, , , , ,

If you want to create some common Clojure code that will be reused between multiple projects but code is not yet mature to become independent library lein has solution for that. It’s called checkouts. There is tons of documentation and other posts online.Something I didn’t get at first is how dependencies of shared ( common ) project are resolved.

Let’s assume following organization:

clojure-repl - common code project / one that will be shared
photo-db-server - project you are working on

First you should go into clojure-repl and do lein install, which will generate pom file and install locally jar and pom to maven repo. In case you want Java like naming you should use com.domain/project-name inside project.clj for project name.
After that go to photo-db-server and add to dependencies:

[com.domain/project-name "version"]

Next comes checkouts, create checkouts directory in project root ( in photo-db-server ). Link root of common project inside checkouts directory, photo-db-server directory structure should look like:

photo-db-server
# project.clj
# checkouts
## clojure-repl ( link to ../../clojure-repl )

By adding dependency to photo-db-server project to clojure-repl we ensured all dependencies of clojure-repl will be followed ( by examining pom in local maven repo )
You can examine classpath with:

lein classpath

It should contain both dependencies from common project and project you are working on.

Clojure integration with Java, notes

Tags

, ,

Goal: Define function in Clojure and call it from Java code

vanja-macbook-2:compile-example vanja$ ls -lh
drwxr-xr-x 2 vanja staff 68B Aug 11 16:24 classes
-rw-r-----@ 1 vanja staff 3.4M Aug 11 16:19 clojure-1.5.1.jar
-rw-r--r-- 1 vanja staff 34B Aug 11 16:20 test.clj

Classes is empty directory, will contain Clojure classes compiled, clojure-1.5.1.jar is Clojure JAR obtained from clojure.org, contents of test.clj:

(ns test)
(defn f1 [x] (+ x 1))

to compile Clojure to Java bytecode launch Clojure REPL

java -cp clojure-1.5.1.jar:. clojure.main

in REPL

(compile 'test)

this will generate byte code for test namespace, for now we have only f1 function definition inside

vanja-macbook-2:compile-example vanja$ ls -lh classes/
total 32
-rw-r--r-- 1 vanja staff 910B Aug 11 16:25 test$f1.class
-rw-r--r-- 1 vanja staff 1.3K Aug 11 16:25 test$fn__4.class
-rw-r--r-- 1 vanja staff 1.4K Aug 11 16:25 test$loading__4910__auto__.class
-rw-r--r-- 1 vanja staff 2.7K Aug 11 16:25 test__init.class

and Java class which will be used as entry point

vanja-macbook-2:compile-example vanja$ ls -lh
total 7024
-rw-r--r-- 1 vanja staff 202B Aug 11 16:32 HelloWorld.java
drwxr-xr-x 6 vanja staff 204B Aug 11 16:25 classes
-rw-r-----@ 1 vanja staff 3.4M Aug 11 16:19 clojure-1.5.1.jar
-rw-r--r-- 1 vanja staff 34B Aug 11 16:20 test.clj

contents of HelloWorld

public class HelloWorld {
	public static void main(String[] args) {
		System.out.println("running main");
		test$f1 function = new test$f1();
		System.out.println("Result: " + function.invoke(1));
	}
}

compile HelloWorld

javac -cp clojure-1.5.1.jar:classes/ HelloWorld.java

final file list

vanja-macbook-2:compile-example vanja$ ls -lh
total 7032
-rw-r--r-- 1 vanja staff 837B Aug 11 16:33 HelloWorld.class
-rw-r--r-- 1 vanja staff 202B Aug 11 16:32 HelloWorld.java
drwxr-xr-x 6 vanja staff 204B Aug 11 16:25 classes
-rw-r-----@ 1 vanja staff 3.4M Aug 11 16:19 clojure-1.5.1.jar
-rw-r--r-- 1 vanja staff 34B Aug 11 16:20 test.clj

running

vanja-macbook-2:compile-example vanja$ java -cp clojure-1.5.1.jar:.:classes/ HelloWorld
running main
Result: 2

Copying files between two Hadoop clusters running on different RPC versions inside single JVM

Tags

, , , , , , , , , , , ,

If you want to accomplish this easies way would be to use access source cluster over HFTP ( read only ) and write to destination cluster over HDFS. For better fine grain control over things you would require HDFS access to both clusters. I will try to describe one way to achieve that.

Idea: Get list of jars ( hadoop client and dependencies ) that are required for each of Hadoop versions. Load all jars required for particular Hadoop version + wrapper class ( implementation of interface that both clusters will support ) inside UrlClassLoader. Use specific class loader to get HadoopClient, interface to Hadoop cluster. Use JDK classes, InputStream, OutputStream, String to exchange data.

public interface HadoopClient {
    public void initialize(String path);

    public OutputStream streamForWriting(String path);

    public InputStream streamForReading(String path);

    public String[] list(String path);
}

To accomplish all this I exploit Maven. Project consists of modules:

hadoop-client-core – HadoopClient interface and HadoopClientLoader, brain of operation

hadoop-client-cdh* – implementations of HadoopClient for CDH3 and CDH5 Cloudera Hadoop versions and dependencies setup

console – Console class, showroom

hadoop-client-core doesn’t depend on any version of Hadoop, each of hadoop-client-cdh modules depend to appropriate version. console depends on hadoop-client-core. Dependency between console and hadoop-client-cdh modules is created over URLClassLoader.

    public static HadoopClient getHadoopClientCdh3u5() {
        ClassLoader classLoader = null;
        List urls = new LinkedList();
        try {
            urls.add(new URL(new URL("file:"), "./hadoop-client-cdh3/target/hadoop-client-cdh3-1.0-SNAPSHOT.jar"));

            for (String path: (new File("./hadoop-client-cdh3/lib/")).list()) {
                urls.add(new URL(new URL("file:"), "./hadoop-client-cdh3/lib/" + path));
            }

            classLoader = URLClassLoader.newInstance(urls.toArray(new URL[urls.size()]));
            //Thread.currentThread().setContextClassLoader(classLoader);
            return (HadoopClient)classLoader.loadClass("com.mungolab.playground.hadoop.HadoopClientImpl").newInstance();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

Since Configuration class needed for FileSystem initialization hardy uses ClassLoader I was required to set ClassLoader context for current Thread to get over problem loading DistributedFileSystem class for hdfs:// schema. I moved this form Loader to each of implementations:

    public void initialize(String path) {
        try {
            ClassLoader threadLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
            Configuration config = new Configuration();
            config.set("fs.hdfs.impl", (new org.apache.hadoop.hdfs.DistributedFileSystem()).getClass().getName());
            this.fs = FileSystem.get(new URI(path), config);
            Thread.currentThread().setContextClassLoader(threadLoader);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Prototype code is shared on my github, code given on github exposes read / write over stream and list.

Console class under console module gives idea how to initialize two clients, list directory and copy file between two clusters.

Lein project is added during debugging, but could be useful to test if everything is working as expected

(defn client (com.mungolab.playground.hadoop.HadoopClientLoader/getHadoopClientCdh3u5))

 

Multiple versions of same class in single JVM

Tags

, , , , , , , , ,

If you have two JARs with same class but on different version ( Hadoop clients for different versions of CDH distribution ) and want to use them in same JVM you will need to load each of them with dedicated classloader. Following post will give you idea how to do that. Working project is included on my github.

Maven project layout. To simulate situation with two JARs on different version I will create Maven project with five modules. Core will contain definition of Client interface which will be implemented in both JARs, mutual dependency will contain util class that both implementation will use ( I added this to test issue with assembly and shade plugins I was unable to overcome with Shade ). Console is wrapper module which depends on core and has Console class with main declared. Both simple-client-v0 and simple-client-v1 depend on core and implement Client interface with ClientImpl class. 

multiple-versions-classloading-1

In runtime situations look like on following image. Console is started from command line, core is loaded as part of system class loader. Two dedicated class loaders are created, each containing right version of simple client and copy of mutual dependency.

multiple-versions-classloading-2

Loading of single class loader:

ClassLoader classLoaderV0 = null;
URL urlV0 = new URL(new URL("file:"), "./sample-client-v0/target/sample-client-v0-1.0-SNAPSHOT.jar");
URL mutualDependency = new URL(new URL("file://"), "./mutural-dependency/target/mutual-dependency-1.0-SNAPSHOT.jar");
classLoaderV0 = URLClassLoader.newInstance(new URL[] { urlV0, mutualDependency });
Client clientV0 = null;
clientV0 = (Client) classLoaderV0.loadClass("com.mungolab.playground.impl.ClientImpl").newInstance();

Test call for client:

System.out.println("Client V0 version: " + clientV0.getVersion());

To run example use following commands

git clone https://github.com/vanjakom/multiple-versions-classloading.git .
cd multiple-versions-classloading
mvn clean package
java -cp console/target/console-1.0-SNAPSHOT.jar:core/target/core-1.0-SNAPSHOT.jar com.mungolab.playground.console.Console

Output should look like this:

ClassLoaders created
Class: class com.mungolab.playground.impl.ClientImpl
Class: class com.mungolab.playground.impl.ClientImpl
Class: class com.mungolab.playground.impl.ClientImpl
Class: class com.mungolab.playground.impl.ClientImpl
Clients created
Client V0 version: 0.0-VER1
Client V1 version: 0.1-VER1
Shade uber jars not working
ClassLoaders created

Notes on output. I tried both approaches here, creating class loader with separate jars required and using single uberjar created with assembly and shade. Output shows class name from all four class loaders ( two with separate jars and two with shade jars ). After clients are created version of each is presented. Shade JARs call is put in try/catch since it’s not working, will try to fix this soon.

Tracking application exceptions with logger

Tags

, , , , , ,

If you have service which is running in production and you want to be sure that everything is working as expected, and that service doesn’t have nice alerting system integrated you can use logging system instead and create simple alerting around it as first aid solution. My implementation is based on log4j backend. Idea is to create custom Appender which will get all logging events ( if it’s possible filtered to some level ) and report them to maintainer or support ( via email or some other way of communication ).

import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
public class AlerterAppender extends AppenderSkeleton {
    @Override
    protected void append(LoggingEvent loggingEvent) {
        // level should be set in log4j properties or other backend configuration
        // but just in case we will check it once more
        if (loggingEvent.getLevel() == Level.ERROR) {
            StringBuilder sb = new StringBuilder();
            sb.append("Error: ").append(loggingEvent.getMessage());
            sb.append(" at thread: " + loggingEvent.getThreadName()).append("\n");
            String[] stack = loggingEvent.getThrowableStrRep();
            if (stack != null) {
                for (String call: stack) {
                    sb.append(call).append("\n");
                }
            }

            // TODO send email
        }
    }

    @Override
    public void close() {
        // empty
    }

    @Override
    public boolean requiresLayout() {
        return false;
    }
}

After you have your custom appender in place you only need to add it in log4j configuration:

log4j.rootLogger=INFO, ..., Alerter
...
log4j.appender.Alerter=com.mungolab.AppenderTest.AlerterAppender
log4j.appender.Alerter.threshold=ERROR

Outputting LZO compressed and indexed data from Map Reduce tasks

Tags

, , , , , , ,

Hadoop distribution currently doesn’t support outputting of LZO compressed data which will be indexed at same time. This feature is extremely useful if data you create with one Map Reduce task is used as input of another. With some changes of default TextOutputFormat I managed to accomplish this. Idea is to create specific LzoOutputFormat class which will be set as output format of task. Example is bellow

LzoIndexedOutputFormat class:

public class LzoIndexedOutputFormat<K, V> extends TextOutputFormat<K, V> {
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        // copied from TextOutputFormat class in Hadoop source code
        Configuration conf = job.getConfiguration();
        String keyValueSeparator =  conf.get("mapred.textoutputformat.separator", "\t");

        LzopCodec lzopCodec = new LzopCodec();
        lzopCodec.setConf(conf);

        Path filePath = getDefaultWorkFile(job, lzopCodec.getDefaultExtension());
        FileSystem fs = filePath.getFileSystem(conf);

        Path indexFilePath = new Path(filePath.toString() + ".index");

        FSDataOutputStream fileOut = fs.create(filePath, false);
        FSDataOutputStream fileIndexOut = fs.create(indexFilePath, false);

        OutputStream finalStream = lzopCodec.createIndexedOutputStream(fileOut, new DataOutputStream(fileIndexOut));

        return new LineRecordWriter<K, V>(new DataOutputStream(finalStream), keyValueSeparator);
}

To use this newly created OutputFormat use:

 
Job job = new Job(new Configuration(), "lzo job");
...
job.setOutputFormatClass(LzoIndexedOutputFormat.class);

job.waitForCompletion(true);