What is Map/Reduce?
Map is a function that applies another function to all the items on a list, to produce another list with all the return values on it. (Another way of saying "apply f to x" is "call f, passing it x". So sometimes it sounds nicer to say "apply" instead of "call".)
This is how map is probably written in C# (it's called Select
and is in the standard library):
public static IEnumerable<R> Select<T, R>(this IEnumerable<T> list, Func<T, R> func)
{
foreach (T item in list)
yield return func(item);
}
As you're a Java dude, and Joel Spolsky likes to tell GROSSLY UNFAIR LIES about how crappy Java is (actually, he's not lying, it is crappy, but I'm trying to win you over), here's my very rough attempt at a Java version (I have no Java compiler, and I vaguely remember Java version 1.1!):
// represents a function that takes one arg and returns a result
public interface IFunctor
{
object invoke(object arg);
}
public static object[] map(object[] list, IFunctor func)
{
object[] returnValues = new object[list.length];
for (int n = 0; n < list.length; n++)
returnValues[n] = func.invoke(list[n]);
return returnValues;
}
I'm sure this can be improved in a million ways. But it's the basic idea.
Reduce is a function that turns all the items on a list into a single value. To do this, it needs to be given another function func
that turns two items into a single value. It would work by giving the first two items to func
. Then the result of that along with the third item. Then the result of that with the fourth item, and so on until all the items have gone and we're left with one value.
In C# reduce is called Aggregate
and is again in the standard library. I'll skip straight to a Java version:
// represents a function that takes two args and returns a result
public interface IBinaryFunctor
{
object invoke(object arg1, object arg2);
}
public static object reduce(object[] list, IBinaryFunctor func)
{
if (list.length == 0)
return null; // or throw something?
if (list.length == 1)
return list[0]; // just return the only item
object returnValue = func.invoke(list[0], list[1]);
for (int n = 1; n < list.length; n++)
returnValue = func.invoke(returnValue, list[n]);
return returnValue;
}
These Java versions need generics adding to them, but I don't know how to do that in Java. But you should be able to pass them anonymous inner classes to provide the functors:
string[] names = getLotsOfNames();
string commaSeparatedNames = (string)reduce(names,
new IBinaryFunctor {
public object invoke(object arg1, object arg2)
{ return ((string)arg1) + ", " + ((string)arg2); }
}
Hopefully generics would get rid of the casts. The typesafe equivalent in C# is:
string commaSeparatedNames = names.Aggregate((a, b) => a + ", " + b);
Why is this "cool"? Simple ways of breaking up larger calculations into smaller pieces, so they can be put back together in different ways, are always cool. The way Google applies this idea is to parallelization, because both map and reduce can be shared out over several computers.
But the key requirement is NOT that your language can treat functions as values. Any OO language can do that. The actual requirement for parallelization is that the little func
functions you pass to map and reduce must not use or update any state. They must return a value that is dependent only on the argument(s) passed to them. Otherwise, the results will be completely screwed up when you try to run the whole thing in parallel.
From the abstract of Google's MapReduce research publication page:
MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.
The advantage of MapReduce is that the processing can be performed in parallel on multiple processing nodes (multiple servers) so it is a system that can scale very well.
Since it's based from the functional programming model, the map
and reduce
steps each do not have any side-effects (the state and results from each subsection of a map
process does not depend on another), so the data set being mapped and reduced can each be separated over multiple processing nodes.
Joel's Can Your Programming Language Do This? piece discusses how understanding functional programming was essential in Google to come up with MapReduce, which powers its search engine. It's a very good read if you're unfamiliar with functional programming and how it allows scalable code.
See also: Wikipedia: MapReduce
Related question: Please explain mapreduce simply
After getting most frustrated with either very long waffley or very short vague blog posts I eventually discovered this very good rigorous concise paper.
Then I went ahead and made it more concise by translating into Scala, where I've provided the simplest case where a user simply just specifies the map
and reduce
parts of the application. In Hadoop/Spark, strictly speaking, a more complex model of programming is employed that require the user to explicitly specify 4 more functions outlined here: http://en.wikipedia.org/wiki/MapReduce#Dataflow
import scalaz.syntax.id._
trait MapReduceModel {
type MultiSet[T] = Iterable[T]
// `map` must be a pure function
def mapPhase[K1, K2, V1, V2](map: ((K1, V1)) => MultiSet[(K2, V2)])
(data: MultiSet[(K1, V1)]): MultiSet[(K2, V2)] =
data.flatMap(map)
def shufflePhase[K2, V2](mappedData: MultiSet[(K2, V2)]): Map[K2, MultiSet[V2]] =
mappedData.groupBy(_._1).mapValues(_.map(_._2))
// `reduce` must be a monoid
def reducePhase[K2, V2, V3](reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)])
(shuffledData: Map[K2, MultiSet[V2]]): MultiSet[V3] =
shuffledData.flatMap(reduce).map(_._2)
def mapReduce[K1, K2, V1, V2, V3](data: MultiSet[(K1, V1)])
(map: ((K1, V1)) => MultiSet[(K2, V2)])
(reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)]): MultiSet[V3] =
mapPhase(map)(data) |> shufflePhase |> reducePhase(reduce)
}
// Kinda how MapReduce works in Hadoop and Spark except `.par` would ensure 1 element gets a process/thread on a cluster
// Furthermore, the splitting here won't enforce any kind of balance and is quite unnecessary anyway as one would expect
// it to already be splitted on HDFS - i.e. the filename would constitute K1
// The shuffle phase will also be parallelized, and use the same partition as the map phase.
abstract class ParMapReduce(mapParNum: Int, reduceParNum: Int) extends MapReduceModel {
def split[T](splitNum: Int)(data: MultiSet[T]): Set[MultiSet[T]]
override def mapPhase[K1, K2, V1, V2](map: ((K1, V1)) => MultiSet[(K2, V2)])
(data: MultiSet[(K1, V1)]): MultiSet[(K2, V2)] = {
val groupedByKey = data.groupBy(_._1).map(_._2)
groupedByKey.flatMap(split(mapParNum / groupedByKey.size + 1))
.par.flatMap(_.map(map)).flatten.toList
}
override def reducePhase[K2, V2, V3](reduce: ((K2, MultiSet[V2])) => MultiSet[(K2, V3)])
(shuffledData: Map[K2, MultiSet[V2]]): MultiSet[V3] =
shuffledData.map(g => split(reduceParNum / shuffledData.size + 1)(g._2).map((g._1, _)))
.par.flatMap(_.map(reduce))
.flatten.map(_._2).toList
}