More File Processing with Erlang
September 29th, 2007 | Published in erlang | Bookmark on Pinboard.in
Tim has weighed in with another attempt at using Erlang for his Wide Finder Project, but he’s unhappy with his new solution too. I feel his pain; below, I’ll show another solution which improves on my earlier one, but I’m not entirely happy with this one either.
Tim’s assessment of my earlier solution is accurate. What was worst about my original approach was that it worked only by reading the whole file into memory, which obviously doesn’t work too well for the quarter-gigabyte log files Tim’s dealing with, unless you have a big machine with big memory. Tim also complains about the “number of processes” parameter, but as I’ll show at the very end of this blog entry, it’s not really needed, and it’s there only to allow for experimentation. And finally, Tim said he doesn’t like my ad hoc binary splitting algorithm, but he also points out that for this approach, it’s pretty much necessary, since Erlang doesn’t provide any modules supporting that.
So what improvements does this new version contain?
- First, it addresses the file size issue by using klacke’s
bfile
module, which allows me to work with a full-size logfile rather than Tim’s original 10,000 line sample. If klacke hadn’t posted this to the erlang-questions mailing list, I wouldn’t have even tried to create a new solution. It’s a great module. - Second, it uses a two-level process hierarchy. I observed that with full-sized logfiles, the number of processes launched to perform matching could be quite large, and those processes would finish and seem to sit around waiting for the parent process to collect their results. Not surprisingly, the more processes the main program launched, the slower and slower it became. The two-level process hierarchy launches collectors whose sole job it is to launch processes to perform matching and then collect their results. This results in far fewer messages sitting around waiting for the main thread to collect them, and also allows for a higher degree of concurrency to be applied to both reading the file and collecting the results.
- It still performs binary splitting into data chunks ending on newlines, but I think the algorithm is a little improved. Specifically, it accumulates the “leftovers” and passes them along to the next recursion, where they’re efficiently stuck onto the front of the next chunk. Coincidentally, Tim’s latest approach does something similar, but I don’t think it’s as efficient (but I didn’t try it to verify that, so I could be wrong).
- Finally, at 84 lines including blank ones, the solution has remained relatively brief. This isn’t an improvement, but keeping the code short has been an unstated goal for me. After all, the brevity of the Ruby solution is pretty striking, plus if I have to write hundreds of lines of Erlang without achieving a significant speedup, I might as well do it in hundreds of line of Java, C++, or C instead.
Regardless of these improvements, the best time I achieved with this new solution is 9.8 seconds on an 8-core 2.33 GHz dual Intel Xeon system with 8 GB of RAM. On my dual-core 2.33 GHz 2 GB MacBook Pro, it clocks in at just over 24 seconds. Still too slow.
I’ve been naming my Erlang modules after Tim, given that he’s the originator of this problem, and he’s responsible for my getting even less sleep than usual over the past week. :-) The module for the solution below is called tbray5
. The module for my original solution was of course named just tbray
. Don’t ask what happened to tbray1
through tbray4
; let’s just say that failed experiments are useful too.
There was a tbray6
briefly as well, when I experimented with adding mmap
capabilities to klacke’s bfile
module. As I mentioned to Tim in email a few days ago, I was wondering whether one could just drop into C, mmap
the large logfile into memory, and return a binary representing the whole file back to Erlang. Seems simple enough, and I got it working, but because of mmap
‘s alignment restrictions combined with the way the Erlang runtime allocate binaries, I was forced to copy the data into the binary, thus killing any performance benefits mmap
might have provided.
Anyway, here’s tbray5.erl, and below, I’ll explain each section. Stop here if you’re not interested in the details.
Compiling and Running
Run the following command to compile the code:
erl -smp -make
To execute it, use one of the following command lines:
erl -smp -noshell -run tbray5 main <numProcs> <logfile>
erl -smp -noshell -run tbray5 main <logfile>
In these command lines, <numProcs>
specifies both the number of processes to use for logfile analysis as well the number of 1k blocks to read from the logfile at a time, and <logfile>
specifies the name of the logfile to analyze. Use the first command line to experiment with the number of processes to launch and 1k blocks to read. I found that 512 procs/blocks seems to yield the fastest execution times, so the second command line above defaults to 512, but YMMV.
find_match
The find_match
function below is the same as always:
find_match("/ongoing/When/" ++ Last) -> case lists:member($., Last) of false -> 1; true -> 0 end; find_match(_) -> 0.
process_binary
The process_binary
function below, which launches “matcher” processes, is the same as before, too, except I switched from lists:foldl
to lists:foldr
because it seemed to provide a slight speedup. This function receives the ID of a process to send results to, and a string (as an Erlang binary) that’s assumed to end with a newline. It launches a process that breaks the binary into a list of strings, tokenizes each string, then counts the matches using find_match
.
process_binary(Pid, Bin) -> spawn( fun() -> L = string:tokens(binary_to_list(Bin), "\n"), V = lists:foldr( fun(Line, Total) -> Tok = string:tokens(Line, " "), Total + find_match(lists:nth(7, Tok)) end, 0, L), Pid ! V end).
break_chunk_on_newline
The break_chunk_on_newline
set of functions below breaks Erlang binaries read from the logfile into chunks that end with a newline. The first variant handles the case where the binary is already smaller than the desired chunk size. It just returns a 2-tuple consisting of the list of all chunks obtained so far, along with the remainder as a binary. The second variant does most of the work, splitting the binary into chunks of the desired size and walking them along to ensure they end with newlines, and accumulating all the processed chunks into a list. The third variant just encapsulates the chunk size calculation and passes the initial empty chunk accumulator list.
break_chunk_on_newline(Bin, Pos, All) when (Pos >= size(Bin)) -> {All, Bin}; break_chunk_on_newline(Bin, Pos, All) -> {_, <<C:8, _/binary>>} = split_binary(Bin, Pos), case C of $\n -> {Ba, Bb} = split_binary(Bin, Pos+1), break_chunk_on_newline(Bb, Pos, All ++ [Ba]); _ -> break_chunk_on_newline(Bin, Pos+1, All) end. break_chunk_on_newline(Bin, N) -> break_chunk_on_newline(Bin, size(Bin) div N, []).
spawn_collector
The spawn_collector
function below just spawns a function that collects match counts from process_binary
processes, and then sends the total matches to another process. It takes a list of binaries as an argument and calls process_binary
for each one, passing the collector process ID to each, and then it returns the collector process ID. The two-level process hierarchy I talked about above has collectors at the first level and “matcher” processes, spawned by the collectors, at the second level.
spawn_collector(Bins, Me) -> Collector = spawn( fun() -> V = lists:foldr(fun(_, T) -> receive V -> T + V end end, 0, Bins), Me ! V end), [process_binary(Collector, B) || B <- Bins], Collector.
scan_finish
The scan_finish
set of functions below handles the remainder binary, the last one after all file reading and binary splitting is done. It ensures that a collector is spawned to handle the remainder, if there is one. The first variant is called if the remainder is empty, the second one otherwise.
scan_finish(<<>>, _, Pids) -> Pids; scan_finish(More, Me, Pids) -> [spawn_collector([More], Me) | Pids].
scan_file
The scan_file
set of functions below reads chunks of the logfile via bfile:fread
, breaks each chunk via break_chunk_on_newline
, and spawns collectors to process them. It handles any remainder binaries by prepending them to the front of the next chunk, or when the file is completely read, by passing any remainders to scan_finish
. Note that the first variant of scan_file
does all the work; the second one just initializes the recursion. The return value of scan_file
is the list of collector process IDs.
scan_file(F, N, Readsize, Me, Leftover, Pids) -> Rd = bfile:fread(F, Readsize), case Rd of {ok, Bin} -> {Bins, More} = break_chunk_on_newline(list_to_binary([Leftover, Bin]), N), scan_file(F, N, Readsize, Me, More, [spawn_collector(Bins, Me) | Pids]); eof -> scan_finish(Leftover, Me, Pids) end. scan_file(F, N, Readsize) -> scan_file(F, N, Readsize, self(), <<>>, []).
start
The start
functions below initializes bfile
, calls scan_file
, and then collects results from the collector processes. The second variant sets the number of bytes to read at a time from the logfile to a default of 512 1k blocks.
start(Num, File, Readsize) -> bfile:load_driver(), {ok, F} = bfile:fopen(File, "r"), Pids = scan_file(F, Num, Readsize), bfile:fclose(F), lists:foldr(fun(_, T) -> receive V -> T + V end end, 0, Pids). start(Num, File) -> start(Num, File, 512*1024).
main
And finally, the main
functions below handle invocations from the shell command line, as explained earlier. The second variant runs a list of values used for the number of processes and the number of bytes to read from the logfile at a time, and prints out a list consisting of each value and the number of seconds it took to execute for that value.
main([N, F]) -> io:format("~p matches found~n", [start(list_to_integer(N), F)]), halt(); main([F]) -> Sz = [16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384], Results = lists:map( fun(S) -> Start = now(), start(S, F, S*1024), {S, time_diff(Start, now())} end, Sz), io:format("~p~n", [Results]), halt().
The time_diff
function (not shown but included in tbray5.erl), which I borrowed from somewhere a few months back, just helps calculate the execution times for the second variant of main
shown above.