, , , , , , ,

Hadoop distribution currently doesn’t support outputting of LZO compressed data which will be indexed at same time. This feature is extremely useful if data you create with one Map Reduce task is used as input of another. With some changes of default TextOutputFormat I managed to accomplish this. Idea is to create specific LzoOutputFormat class which will be set as output format of task. Example is bellow

LzoIndexedOutputFormat class:

public class LzoIndexedOutputFormat<K, V> extends TextOutputFormat<K, V> {
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        // copied from TextOutputFormat class in Hadoop source code
        Configuration conf = job.getConfiguration();
        String keyValueSeparator =  conf.get("mapred.textoutputformat.separator", "\t");

        LzopCodec lzopCodec = new LzopCodec();

        Path filePath = getDefaultWorkFile(job, lzopCodec.getDefaultExtension());
        FileSystem fs = filePath.getFileSystem(conf);

        Path indexFilePath = new Path(filePath.toString() + ".index");

        FSDataOutputStream fileOut = fs.create(filePath, false);
        FSDataOutputStream fileIndexOut = fs.create(indexFilePath, false);

        OutputStream finalStream = lzopCodec.createIndexedOutputStream(fileOut, new DataOutputStream(fileIndexOut));

        return new LineRecordWriter<K, V>(new DataOutputStream(finalStream), keyValueSeparator);

To use this newly created OutputFormat use:

Job job = new Job(new Configuration(), "lzo job");