Archive for September, 2007

More File Processing with Erlang

September 29th, 2007  |  Published in erlang  |  Bookmark on Pinboard.in

Tim has weighed in with another attempt at using Erlang for his Wide Finder Project, but he’s unhappy with his new solution too. I feel his pain; below, I’ll show another solution which improves on my earlier one, but I’m not entirely happy with this one either.

Tim’s assessment of my earlier solution is accurate. What was worst about my original approach was that it worked only by reading the whole file into memory, which obviously doesn’t work too well for the quarter-gigabyte log files Tim’s dealing with, unless you have a big machine with big memory. Tim also complains about the “number of processes” parameter, but as I’ll show at the very end of this blog entry, it’s not really needed, and it’s there only to allow for experimentation. And finally, Tim said he doesn’t like my ad hoc binary splitting algorithm, but he also points out that for this approach, it’s pretty much necessary, since Erlang doesn’t provide any modules supporting that.

So what improvements does this new version contain?

  • First, it addresses the file size issue by using klacke’s bfile module, which allows me to work with a full-size logfile rather than Tim’s original 10,000 line sample. If klacke hadn’t posted this to the erlang-questions mailing list, I wouldn’t have even tried to create a new solution. It’s a great module.
  • Second, it uses a two-level process hierarchy. I observed that with full-sized logfiles, the number of processes launched to perform matching could be quite large, and those processes would finish and seem to sit around waiting for the parent process to collect their results. Not surprisingly, the more processes the main program launched, the slower and slower it became. The two-level process hierarchy launches collectors whose sole job it is to launch processes to perform matching and then collect their results. This results in far fewer messages sitting around waiting for the main thread to collect them, and also allows for a higher degree of concurrency to be applied to both reading the file and collecting the results.
  • It still performs binary splitting into data chunks ending on newlines, but I think the algorithm is a little improved. Specifically, it accumulates the “leftovers” and passes them along to the next recursion, where they’re efficiently stuck onto the front of the next chunk. Coincidentally, Tim’s latest approach does something similar, but I don’t think it’s as efficient (but I didn’t try it to verify that, so I could be wrong).
  • Finally, at 84 lines including blank ones, the solution has remained relatively brief. This isn’t an improvement, but keeping the code short has been an unstated goal for me. After all, the brevity of the Ruby solution is pretty striking, plus if I have to write hundreds of lines of Erlang without achieving a significant speedup, I might as well do it in hundreds of line of Java, C++, or C instead.

Regardless of these improvements, the best time I achieved with this new solution is 9.8 seconds on an 8-core 2.33 GHz dual Intel Xeon system with 8 GB of RAM. On my dual-core 2.33 GHz 2 GB MacBook Pro, it clocks in at just over 24 seconds. Still too slow.

I’ve been naming my Erlang modules after Tim, given that he’s the originator of this problem, and he’s responsible for my getting even less sleep than usual over the past week. :-) The module for the solution below is called tbray5. The module for my original solution was of course named just tbray. Don’t ask what happened to tbray1 through tbray4; let’s just say that failed experiments are useful too.

There was a tbray6 briefly as well, when I experimented with adding mmap capabilities to klacke’s bfile module. As I mentioned to Tim in email a few days ago, I was wondering whether one could just drop into C, mmap the large logfile into memory, and return a binary representing the whole file back to Erlang. Seems simple enough, and I got it working, but because of mmap‘s alignment restrictions combined with the way the Erlang runtime allocate binaries, I was forced to copy the data into the binary, thus killing any performance benefits mmap might have provided.

Anyway, here’s tbray5.erl, and below, I’ll explain each section. Stop here if you’re not interested in the details.

Compiling and Running

Run the following command to compile the code:

erl -smp -make

To execute it, use one of the following command lines:

erl -smp -noshell -run tbray5 main <numProcs> <logfile>
erl -smp -noshell -run tbray5 main <logfile>

In these command lines, <numProcs> specifies both the number of processes to use for logfile analysis as well the number of 1k blocks to read from the logfile at a time, and <logfile> specifies the name of the logfile to analyze. Use the first command line to experiment with the number of processes to launch and 1k blocks to read. I found that 512 procs/blocks seems to yield the fastest execution times, so the second command line above defaults to 512, but YMMV.

find_match

The find_match function below is the same as always:

find_match("/ongoing/When/" ++ Last) ->
    case lists:member($., Last) of
        false -> 1;
        true -> 0
    end;
find_match(_) -> 0.

process_binary

The process_binary function below, which launches “matcher” processes, is the same as before, too, except I switched from lists:foldl to lists:foldr because it seemed to provide a slight speedup. This function receives the ID of a process to send results to, and a string (as an Erlang binary) that’s assumed to end with a newline. It launches a process that breaks the binary into a list of strings, tokenizes each string, then counts the matches using find_match.

process_binary(Pid, Bin) ->
    spawn(
      fun() ->
              L = string:tokens(binary_to_list(Bin), "\n"),
              V = lists:foldr(
                    fun(Line, Total) ->
                            Tok = string:tokens(Line, " "),
                            Total + find_match(lists:nth(7, Tok))
                    end, 0, L),
              Pid ! V
      end).

break_chunk_on_newline

The break_chunk_on_newline set of functions below breaks Erlang binaries read from the logfile into chunks that end with a newline. The first variant handles the case where the binary is already smaller than the desired chunk size. It just returns a 2-tuple consisting of the list of all chunks obtained so far, along with the remainder as a binary. The second variant does most of the work, splitting the binary into chunks of the desired size and walking them along to ensure they end with newlines, and accumulating all the processed chunks into a list. The third variant just encapsulates the chunk size calculation and passes the initial empty chunk accumulator list.

break_chunk_on_newline(Bin, Pos, All) when (Pos >= size(Bin)) -> {All, Bin};
break_chunk_on_newline(Bin, Pos, All) ->
    {_, <<C:8, _/binary>>} = split_binary(Bin, Pos),
    case C of
        $\n ->
            {Ba, Bb} = split_binary(Bin, Pos+1),
            break_chunk_on_newline(Bb, Pos, All ++ [Ba]);
        _ -> break_chunk_on_newline(Bin, Pos+1, All)
    end.
break_chunk_on_newline(Bin, N) -> break_chunk_on_newline(Bin, size(Bin) div N, []).

spawn_collector

The spawn_collector function below just spawns a function that collects match counts from process_binary processes, and then sends the total matches to another process. It takes a list of binaries as an argument and calls process_binary for each one, passing the collector process ID to each, and then it returns the collector process ID. The two-level process hierarchy I talked about above has collectors at the first level and “matcher” processes, spawned by the collectors, at the second level.

spawn_collector(Bins, Me) ->
    Collector = spawn(
                  fun() ->
                          V = lists:foldr(fun(_, T) -> receive V -> T + V end end, 0, Bins),
                          Me ! V
                  end),
    [process_binary(Collector, B) || B <- Bins],
    Collector.

scan_finish

The scan_finish set of functions below handles the remainder binary, the last one after all file reading and binary splitting is done. It ensures that a collector is spawned to handle the remainder, if there is one. The first variant is called if the remainder is empty, the second one otherwise.

scan_finish(<<>>, _, Pids) -> Pids;
scan_finish(More, Me, Pids) -> [spawn_collector([More], Me) | Pids].

scan_file

The scan_file set of functions below reads chunks of the logfile via bfile:fread, breaks each chunk via break_chunk_on_newline, and spawns collectors to process them. It handles any remainder binaries by prepending them to the front of the next chunk, or when the file is completely read, by passing any remainders to scan_finish. Note that the first variant of scan_file does all the work; the second one just initializes the recursion. The return value of scan_file is the list of collector process IDs.

scan_file(F, N, Readsize, Me, Leftover, Pids) ->
    Rd = bfile:fread(F, Readsize),
    case Rd of
        {ok, Bin} ->
            {Bins, More} = break_chunk_on_newline(list_to_binary([Leftover, Bin]), N),
            scan_file(F, N, Readsize, Me, More, [spawn_collector(Bins, Me) | Pids]);
        eof -> scan_finish(Leftover, Me, Pids)
    end.
scan_file(F, N, Readsize) ->
    scan_file(F, N, Readsize, self(), <<>>, []).

start

The start functions below initializes bfile, calls scan_file, and then collects results from the collector processes. The second variant sets the number of bytes to read at a time from the logfile to a default of 512 1k blocks.

start(Num, File, Readsize) ->
    bfile:load_driver(),
    {ok, F} = bfile:fopen(File, "r"),
    Pids = scan_file(F, Num, Readsize),
    bfile:fclose(F),
    lists:foldr(fun(_, T) -> receive V -> T + V end end, 0, Pids).
start(Num, File) ->
    start(Num, File, 512*1024).

main

And finally, the main functions below handle invocations from the shell command line, as explained earlier. The second variant runs a list of values used for the number of processes and the number of bytes to read from the logfile at a time, and prints out a list consisting of each value and the number of seconds it took to execute for that value.

main([N, F]) ->
    io:format("~p matches found~n", [start(list_to_integer(N), F)]),
    halt();
main([F]) ->
    Sz = [16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384],
    Results = lists:map(
      fun(S) ->
              Start = now(),
              start(S, F, S*1024),
              {S, time_diff(Start, now())}
      end,
      Sz),
    io:format("~p~n", [Results]),
    halt().

The time_diff function (not shown but included in tbray5.erl), which I borrowed from somewhere a few months back, just helps calculate the execution times for the second variant of main shown above.

QCon SOA Track

September 26th, 2007  |  Published in conferences  |  Bookmark on Pinboard.in

Stefan Tilkov emailed me today about the SOA Track at QCon San Francisco coming up in early November. He’s hosting that track, so he was emailing all the speakers to let us all know the speaker order. His list of speakers is very impressive:

  • Sanjiva Weerawarana will attempt to debunk myths surrounding “REST vs WS-*.”
  • Pete Lacey will talk about REST’s architectural constraints and how they support all the desired -ilities of distributed systems, and contrast them against SOA’s lack of constraints.
  • Dan Diephouse will cover the Atom Publishing Protocol and how to use it to build services.
  • Jim Webber will cover the use of REST for enterprise integration.

I’ll be starting the track off after Stefan’s introduction with my REST Eye for the SOA Guy presentation.

I’ve spoken at more than my share of conferences over the years, and after awhile you kind of get numb to it, but not this time. In all honesty, I think this could very well be the strongest track of speakers I’ve ever been lucky enough to be a part of. All these guys have significant experience with building real-world systems and they have tremendous insights into both REST and SOA. I was already looking forward to QCon, given that it’s a direct spinoff of my most favorite conference, but after Stefan’s email reminded me of this “who’s who” list of speakers, I can’t wait to get there. I hope to see you there too!

Tim Bray and Erlang

September 23rd, 2007  |  Published in erlang  |  Bookmark on Pinboard.in

Tim’s playing with Erlang, trying to rewrite a Ruby program for analyzing his website logs. His initial reaction appeared to be one of disgust, since his Erlang program was an order of magnitude slower than the Ruby version. Thankfully, though, a bunch of people have since jumped in and made the solution more palatable (check the comments on his posting).

Reading between the lines, it seems that Tim was hoping to take advantage of Erlang’s concurrency to put his multicore machines to work analyzing his logs. Unfortunately, none of the answers posted in the blog comments seem to provide that, so I decided to take a crack at it myself.

First I wrote a tiny little Erlang program to read in Tim’s entire sample logfile using file:read_file, turn the result into a list of strings using binary_to_list, and then process the strings. Simple timings showed the binary_to_list call to be the slowest part, so I decided to throw Erlang’s multiprocess capability at it. If you’re familiar with pthreads or Java threads, think of an Erlang “process” as basically a very, very lightweight thread.

Here’s my solution:

-module(tbray).
-export([start/2]).

find_match("/ongoing/When/" ++ Last) ->
    case lists:member($., Last) of
        false -> 1;
        true -> 0
    end;
find_match(_) -> 0.

process_binary(Pid, Bin) ->
    spawn(fun() ->
        L = string:tokens(binary_to_list(Bin), "\\n"),
        V = lists:foldl(
            fun(Line, Total) ->
                Total + find_match(lists:nth(7, string:tokens(Line, " "))) end,
            0, L),
        Pid ! V
        end).

split_on_newline(Bin, N, All) when size(Bin) < N ->
    All ++ [Bin];
split_on_newline(Bin, N, All) ->
    {_, <<C:8, _/binary>>} = split_binary(Bin, N),
    case C of
        $\\n ->
          {B21, B22} = split_binary(Bin, N+1),
          split_on_newline(B22, N, All ++ [B21]);
        _ -> split_on_newline(Bin, N+1, All)
    end.
split_on_newline(Bin, N) when N == size(Bin) -> [Bin];
split_on_newline(Bin, N) -> split_on_newline(Bin, N, []).

start(Num, Input) ->
    {ok, Data} = file:read_file(Input),
    Bins = split_on_newline(Data, size(Data) div Num),
    Me = self(),
    Pids = [process_binary(Me, B) || B <- Bins],
    lists:foldl(
        fun(_, Total) -> receive X -> Total + X end end,
        0, Pids).

The way this solution works is that it uses multiple Erlang processes to convert chunks of the input file to lists of strings and process them for matches. Begin with the start/2 function at the very bottom. First, we read the file in one shot, then split it into Num chunks, with the split_on_newline function variants being mindful to end each chunk on a newline character so we don’t split lines across chunks. We then pass each chunk to the process_binary/2 function using a list comprehension. Each process_binary/2 call spawns a new process to first convert its chunk to a list of strings and then process those strings for matches.

Now let’s time it. My MacBook Pro has two cores, so let’s enable SMP, and bump the Erlang process limit up to 60,000. First, we’ll compile the module and time it with just a single process:

$ erl -smp enable +P 60000
1> c(tbray).
{ok,tbray}
2> timer:tc(tbray, start, [1, "o10k.ap"]).
{661587,1101}

OK, at 0.66 seconds, we’re already a lot faster than Tim’s approach (the second value, 1101, is the number of matches we found), but can it go faster? Let’s try 2 processes:

3> timer:tc(tbray, start, [2, "o10k.ap"]).
{472067,1101}

That dropped us to 0.47 seconds, which is not an insignificant speedup. Do more processes help?

4> timer:tc(tbray, start, [4, "o10k.ap"]).
{390786,1101}

Yes, at 4 processes we drop to 0.39 seconds. Let’s go up a few orders of magnitude:

5> timer:tc(tbray, start, [40, "o10k.ap"]).
{380753,1101}
6> timer:tc(tbray, start, [400, "o10k.ap"]).
{322979,1101}
7> timer:tc(tbray, start, [4000, "o10k.ap"]).
{316857,1101}
8> timer:tc(tbray, start, [40000, "o10k.ap"]).
{318153,1101}

As we increase the number of Erlang processes, our performance improves, up to a point. At 40,000 processes we’re slower than we were at 4000. Maybe there’s a better number in between? It turns out that despite the numbers listed above, once you get above 400 processes or so, the numbers remain about the same. The best I got on my MacBook Pro after numerous runs was 0.301 seconds with 2400 processes, but the average best seems to be about 0.318 seconds. The performance of this approach comes pretty close to other solutions that rely on external non-Erlang assistance, at least for Tim’s sample dataset on this machine.

I also tried it on an 8-core (2 Intel Xeon E5345 CPUs) 64-bit Dell box running Linux, and it clocked in at 0.126 seconds with 2400 processes, and I saw a 0.124 seconds with 1200 processes. I believe this utilization of multiple cores was exactly what Tim was looking for.

If you’re a Java or C++ programmer, note the ease with which we can spawn Erlang processes and have them communicate, and note how quickly we can launch thousands of processes. This is what Tim was after, I believe, so hopefully my example provides food for thought in that area. BTW, I’m no Erlang expert, so if anyone wants to suggest improvements to what I’ve written, please feel free to comment here.

Rejoining the blogosphere

September 22nd, 2007  |  Published in General  |  Bookmark on Pinboard.in

Welcome to my new blog! When I left IONA, I figured it would be just a month or two before I could return to blogging. I guess it took a little longer than that, though, given that seven months have already gone by.

Many have asked what I’ve been up to for the past seven months. All I can tell you is that I can’t tell you. :-) I work for a startup in Westford, MA, USA, in an industry that’s totally different than the one I used to work in. On the surface it might seem daunting to leave an industry where you’re kind of considered an expert for one in which you’ve never worked before, but at the end of the day, software is software, and expertise extends farther than you might think.

One thing I can tell you is that I am having a blast. The team I work with is experienced and very strong. We’re all old experienced enough that egos are largely left out of the office, so many of the typical bikeshed problems that plague groups of developers just don’t seem to happen. Well, OK, some do :-), but they seem to get solved quickly. In general, though, a lot seems to get done very quickly and at a very high quality. It’s hard work, and my days seem to regularly extend to 16-18 hours, but that’s exactly the type of situation in which I thrive.

The best part for me, though, is getting to work in multiple programming languages on a daily basis. Too many shops in the enterprise integration and web services world I left behind just want to write everything in Java. Zzzzz…snore. I never really warmed up to Java because I find it totally boring. To me, it’s like C++ with the fun bits taken out. If I want to use a C++-like language, I’ll use C++. OTOH if I want a language other than C++, then I want it to be very different from C++. You know, things like Python, Ruby, and Erlang, and of course the requisite Emacs-Lisp hacking that one is wont to do.

But more on all that, and various other topics, later. I should have a reasonable amount to talk about over the next while, as I’ve learned a lot by switching industries. Until then, thanks for welcoming me back into your news feed.