X

This site uses cookies and by using the site you are consenting to this. We utilize cookies to optimize our brand’s web presence and website experience. To learn more about cookies, click here to read our privacy statement.

Functional Big Data

Everything old is new again with functional programming discussions dominating many fora and conferences, new Big Data frameworks and projects using it, and new or formerly obscure languages coming to the fore. Big Data has been a pretty nebulous term, but one thing for sure is that it was largely popularized by Apache Hadoop. Hadoop, in turn, was influenced by the publication of Google’s MapReduce paper, with its name parts “map” and “reduce” referring to the primitive operations from LISP. This popular abstraction has tied Big Data to functional programming, despite the latter having a much longer history outside of common practice.

Without going into too much detail, the map and reduce operations somehow divide or transform an input set of data, and then combine associated sets of intermediate data into a smaller set of outputs. The “somehow” there is key, as map-reduce is just a technique, a combination of two rather simple directives. What these operations should do is entirely up to the programmer.

For instance, take the following table:

[sourcecode]
| Name | Location | # Children |
| Andy | Chicago | 3 |
| Bob | Boston | 1 |
| Sam | New York | 0 |
| Bob | Chicago | 2 |
[/sourcecode]

Treating each of these table rows as a separate input into the map operation, we could transform the table into the following:

[sourcecode]
| Location | # Children |
| Chicago | 3 |
| Boston | 1 |
| New York | 0 |
| Chicago | 2 |
[/sourcecode]

Keep in mind that the original table still exists as the input. It has only been transformed into an intermediate set of data, independent of the input set. Now that we have the data into a shape that is more suitable, we can now reduce this by summing and averaging the number of children per city:

[sourcecode]
| Location | # Children | Average # Children |
| Chicago | 5 | 2.5 |
| Boston | 1 | 1 |
| New York | 0 | 0 |
[/sourcecode]

Finally, operating on this new intermediate set, we can retrieve the city and total number of children where the average is above two with a filter:

[sourcecode]
| Location | # Children |
| Chicago | 5 |
[/sourcecode]

That is the map-reduce paradigm in a nutshell.

This method is generally used on embarrassingly parallel workloads. Such workloads consist of processing data sets where each data point is independent of all other data points, at least as far as the type of analysis being done on the data (e.g., analysis of time series data where the time component is not relevant.) In the map operation above, the processing of Andy does not rely upon the processing of Bob, and vice versa, since we are just transforming the information in a trivial fashion. (However, the reduce operation, after the initial and typically higher cost map operation, could potentially use a barrier.) Since the data points can be processed separately, they can be easily distributed to other nodes for simultaneous processing. With n data points in a perfect world, we could reduce our sequential processing time for the map operation from n to 1 by processing across n nodes at the same time.

This is all fine and dandy, but how does functional programming really affect the bottom line? While there are many benefits, some of the most significant ones were touched on earlier: transformations and immutability.

If you can envision your process as a pipeline, you can implement it with a chain of functions with each feeding the next, preferably avoiding any intermediate state, until you achieve the desired output. Each of the map and reduce operations simply transform the data, generating an entirely new output based on the input. Each one of those transformations must be a “pure” function: it must not have any side effects (affect or be affected by the execution of other parts of the system) and it must be stateless (it must not affect future executions of itself.) If you will, imagine each of these steps as a separate function. For the above example:

[sourcecode]
result = filter(reduce(map()))
[/sourcecode]

or

[sourcecode]
result = map | reduce | filter
[/sourcecode]

Closely tied to transformations is the concept of immutability, where “variables” are assigned values only once. This is perhaps the core benefit of functional programming in this context. When there is no shared, mutable state between processes, the needs for complicated synchronization and full knowledge of all possible program states is obviated. Failing to satisfy these requirements is the source of many software defects, and without them, the programmer is freed to think about the actual problem at hand and not the machinery around making it happen.

Let’s look at a tiny example to illustrate these points, one where we want to find all last names that sound like a target last name. Since all of the inputs are independent, we can parallelize the search. A nicely contrived imperative implementation might look like the following:

[sourcecode language=”java”]
        private String encoder(String str) {
                return new DoubleMetaphone().doubleMetaphone(str);
        }

        private List<String> imperative1(String needle, List<String> haystack) {
                final String needleCode = encoder(needle);

                final List<String> results = new ArrayList<>();

                final ExecutorService executorService = ForkJoinPool.commonPool();

                final List<Future<?>> futures = new ArrayList<>();

                for (final String row : haystack.subList(1, haystack.size())) {
                        futures.add(executorService.submit(new Runnable() {

                                @Override
                                public void run() {
                                        final String surname = row.split(",")[0];

                                        if (needleCode.equals(encoder(surname))) {
                                                synchronized (results) {
                                                        results.add(surname);
                                                }
                                        }
                                }

                        }));
                }

                for (final Future<?> future : futures) {
                        try {
                                future.get();
                        } catch (InterruptedException | ExecutionException e) {
                                throw new RuntimeException(e);
                        }
                }

                Collections.sort(results);

                return results;
        }
[/sourcecode]

We have two methods, a lot of boilerplate code for the loops, and a synchronized block for collecting the results. We can be a little fair and use a Callable and avoid the synchronization around our shared state:

[sourcecode language=”java”]
        private String encoder(String str) {
                return new DoubleMetaphone().doubleMetaphone(str);
        }

        private List<String> imperative2(String needle, List<String> haystack) {
                final String needleCode = encoder(needle);

                final List<String> results = new ArrayList<>();

                final ExecutorService executorService = ForkJoinPool.commonPool();

                final List<Future<String>> futures = new ArrayList<>();

                for (final String row : haystack.subList(1, haystack.size())) {
                        futures.add(executorService.submit(new Callable<String>() {

                                @Override
                                public String call() {
                                        final String surname = row.split(",")[0];

                                        return needleCode.equals(encoder(surname)) ? surname : null;
                                }

                        }));
                }

                for (final Future<String> future : futures) {
                        try {
                                final String surname = future.get();

                                if (surname != null) {
                                        results.add(surname);
                                }
                        } catch (InterruptedException | ExecutionException e) {
                                throw new RuntimeException(e);
                        }
                }

                Collections.sort(results);

                return results;
        }
[/sourcecode]

That still doesn’t clean things up in regards to the “what” that we are doing versus the “how” we are doing it. It has made things a little better, and it’s likely to perform better, but it’s still a lot of cruft. Now let’s look at a functional implementation:

[sourcecode language=”java”]
        private List<String> functional(String needle, List<String> haystack) {
                final Function<String, String> encoder = str -> new DoubleMetaphone()
                        .doubleMetaphone(str);

                final String needleCode = encoder.apply(needle);

                return haystack.parallelStream().skip(1).map(row -> row.split(",")[0])
                        .filter(surname -> needleCode.equals(encoder.apply(surname)))
                        .sorted().collect(Collectors.toList());
        }
[/sourcecode]

We’ve reduced all of that to one method with three statements. Granted, a lot is happening in the last statement, but it reads more like a natural language and encompasses all of the same functionality as the earlier implementations. We have simple lambda functions which are easy to think about and test. We don’t need to worry about side effects or synchronization, because all of our operations are free of side effects. We do share needleCode, but it is immutable. We have fewer lines to test, so there is less room for defects. On top of all that, it is faster:

[sourcecode]
Functional: Found 42 similar names in an average of 66106us: [KNADLE, KNETTEL, KNIGHTLY, KNITTEL, KNITTLE, KNODEL, KNODELL, KNODLE, NADAL, NADEL, NADELL, NADELLA, NATAL, NATALE, NATALI, NATALIE, NATELLI, NATIELLO, NATOLA, NATOLE, NATOLI, NEDELL, NEDLEY, NEEDELL, NEEDLE, NEIDEL, NETTELL, NETTLE, NITTOLI, NITTOLO, NODAL, NODELL, NOTLEY, NOTTOLI, NUDEL, NUDELL, NUTALL, NUTILE, NUTLEY, NUTTALL, NUTTLE, NUTWELL]
Imperative 1: Found 42 similar names in an average of 96349us: [KNADLE, KNETTEL, KNIGHTLY, KNITTEL, KNITTLE, KNODEL, KNODELL, KNODLE, NADAL, NADEL, NADELL, NADELLA, NATAL, NATALE, NATALI, NATALIE, NATELLI, NATIELLO, NATOLA, NATOLE, NATOLI, NEDELL, NEDLEY, NEEDELL, NEEDLE, NEIDEL, NETTELL, NETTLE, NITTOLI, NITTOLO, NODAL, NODELL, NOTLEY, NOTTOLI, NUDEL, NUDELL, NUTALL, NUTILE, NUTLEY, NUTTALL, NUTTLE, NUTWELL]
Imperative 2: Found 42 similar names in an average of 87531us: [KNADLE, KNETTEL, KNIGHTLY, KNITTEL, KNITTLE, KNODEL, KNODELL, KNODLE, NADAL, NADEL, NADELL, NADELLA, NATAL, NATALE, NATALI, NATALIE, NATELLI, NATIELLO, NATOLA, NATOLE, NATOLI, NEDELL, NEDLEY, NEEDELL, NEEDLE, NEIDEL, NETTELL, NETTLE, NITTOLI, NITTOLO, NODAL, NODELL, NOTLEY, NOTTOLI, NUDEL, NUDELL, NUTALL, NUTILE, NUTLEY, NUTTALL, NUTTLE, NUTWELL]
[/sourcecode]

Once you get the hang of thinking about your processes in functional terms, whether you are specifying the process, using a Big Data framework, or writing parallel code from scratch, you can relatively easily reap the benefits of fewer defects with more concise code that takes less time to write. You don’t even need to learn a new language.

The source code for this post is available here.