Tags

, , , ,

Bloom filter is pretty useful tool when writing Map Reduce tasks. With the constraint that it can generate a certain percentage of false positives, Bloom filter is ideal space-efficient solution to get rid of irrelevant records during map phase of Map Reduce tasks.

So scenario is following. Let’s say you have 200M records from which you want to select 20M ( filtering by id ) and add some data ( additional_data ) on top of record.

Solution 1, without use of Bloom filter, drive records, additional data and ids list to map input. Map to (id, record), (id, additional_data) and (id, “yes”) pairs. Inside reduce if id has corresponding “yes” record apply additional data to record and output (null, record + additional_data).

Solution 2, without use of Bloom filter, but with HashSet inside map. Now we drive only records and additional data to map input, but in map:setup call we load HashSet with record ids from HDFS that should be outputted to reduce phase. During map:call we output only records which are inside HashSet. In reducer we now have only required records, one thing left is to apply additional data and output record.

Let’s review Solution 1. Small memory footprint, easy to write and use, large IO between map and reduce ( we transfer between nodes 90% of unused data ).

Solution 2 reviewed. To get this work on cluster we need much more memory for each map task on nodes -> number of parallel map tasks must be reduced -> longer execution time.

Solution 3, use Bloom filter. Same scenario as in Solution 2, but instead HashSet we use Bloom filter with pre calculated data set inside map tasks and to map input we put records, additional data and ids of required records. Bloom filter can be pre calculated locally or with simple Map Reduce task. In reduce we have our 20M + few percentages of unwanted records. We filter these as in Scenario 1. This will give us benefits of low IO between nodes presented with Solution 2 and small memory footprint presented with Solution 1.

Bloom filter implementation is well explained on links given bellow, so I will not go in detail with implementation. Main part of implementation is for sure decision which hasher to use, this decision is of course made with key type in mind. I will assume in implementation given bellow that key is represented as long. In example on GitHub I gave three implementations based on Murmur hash, java Random and basic string hasher.

Also, BloomFilterExample contains two ways of upload of Bloom filter to Map task. In first, job configuration is used. This is just as example, real usage of this would be inappropriate if filter footprint is larger. As second, I choose Distributed Cache feature of Hadoop.

Two tests are also included, one unit and one performance. Performance test shows pretty same results for Murmur hash and java Random implementation. Table shows false positive results on 1M records depending on number of bits used for each element ( from this and number of elements number of hash functions is calculated ):

number of bits per element Java Random Murmur hash String hash
2 39% 39% 39%
4 14% 14% 31%
8 2% 2% 25%

To run everything, check out GitHub and do

mvn clean package

This should compile everything and run unit test. For performance test:

mvn -Dtest=com.busywait.bloomfilter.BloomFilterPerformanceTest test

To run Map Reduce Bloom example with use of configuration:

bin/bloom_configuration.sh

For Map Reduce Bloom example based on Distributed Cache:

bloom_distributedcache.sh

Map Reduce examples are set to run on localhost Hadoop CDH3u0 cluster, with configuration in /etc/hadoop/conf/, output will be saved to hdfs://localhost/temp/output_configuration and hdfs://localhost/temp/output_distcache. Input is automatically generated inside hdfs://localhost/temp/ dir.

Useful links:

Wiki page
Greplin Bloom filter implementation
Java Bloom filter implementation
A decent stand-alone Java Bloom Filter implementation

Advertisements