I prepared 2 projects that are doing exactly the same thing - processing Wiki-voyage XML and wiki-markup file and getting some statistics about it.
git clone https://github.com/codesurf42/wikiParser.git git clone https://github.com/codesurf42/wikiParserLinear.git
The first one is based on Akka actors for concurrent processing and Akka agents for counting data, another one is a reference solution (linear processing) to compare efficiency.
There are also simple metrics counting execution time and number of calls - I found them very useful to check consistency across both solutions too.
Think asynchronously
XML is processed in a StAX way, this is a linear loop over the input xml stream and due to its nature (keeping current state) that loop has to be single.
They are used to properly count single values across actors/threads. They can be very simple like a counter:
(the simple var count "in actor" is actually counting, how many messages a single actor processed).
They can also take a function to compute, like for a longest article title:
This is a nice tool for profiling your Akka app. You can watch real-time numbers of messages processed by actors, latency in queues and more. This project is down-versioned to the latest Typesafe console version - to start it with this monitoring tool, simply use:
Typesafe console is slowing down the runtime and sometimes can go crazy - just to be aware.
There are some BDD-style tests for parsing wiki markup. Writing regexp in Scala is easier than in Java (eg. better quoting of strings, you can avoid \\n within triple doublequotes etc.), but it is still not that clean and pure experience like in Perl. Shame.
It happened that even when using quite complex regexp for parsing article sections it was too fast to see how actors are optimising data flow. So I deliberately added 1ms delay in WikiParising.getSection to show these effects better.
Here are timings from WikiParserLinear (in microseconds, with a number of calls):
and WikiParser (akka) with default actor settings, without any router configurations:
This is already faster, as for example *seePlaces* takes time when *readXml-parseXml* happens.
There are two dumps of counters as they are not exactly the same - there are still some messages being processed, like these in seePlaces (49685 / 49788) or seePlacesLength.
Now, because we have some bottlenecks, we can do something about them. Let's just get more workers here:
How it changed timings?
This is almost 3x faster that linear version and 2.5x faster than with simple default Akka setup!
You can see, cumulative time of *seePlaces* is now longer than application run time, which depends on *readXml-parseXml* block.
Technically all I could do here (tiny i5 dual core) was to off-load anything from heavy StAX loop and get more hands for *seePlaces*. Perhaps with more cores, I could be able to "provide" xml/stax with a single core, but it is now a biggest bottleneck anyway.
There is a room for further enhancements - I can do something about this linear stax loop, for example divide whole XML file into few chunks, where the split point should be between nearest </page> <page> elements and process them concurrently.
On bigger machines / clusters there will be more questions:
How big should be a pool of specific actors? Auto-resized? How should it change over the execution time, what strategies can we use? Somehow it should depend on queues, latencies and number of waiting messages and priorities for specific tasks/actors (e.g. metrics can wait longer) and adopting routing strategies to it - so we can react to backlogs of messages waiting to be processed. And so on...Actors
The good idea is to keep them simple, one type of job per actor - that way it is simple to tune system by creating pool of actors when the stage is significantly slow than others.
Agents
They are used to properly count single values across actors/threads. They can be very simple like a counter:
Parser.agentCount.send(_ + 1)
(the simple var count "in actor" is actually counting, how many messages a single actor processed).
They can also take a function to compute, like for a longest article title:
// we can just compare which one is the longest Parser.agentMaxArtTitle.send(current => if (current.length < e.title.length) e.title else current )
Metrics
There are also some in-project metrics gathered by using actors too - it is very important to see where the time is spend and where we have bottlenecks to prioritise our effort on improving it. They can also help to check consistency across different solutions - simply we should get the same numbers.
Typesafe console
This is a nice tool for profiling your Akka app. You can watch real-time numbers of messages processed by actors, latency in queues and more. This project is down-versioned to the latest Typesafe console version - to start it with this monitoring tool, simply use:
sbt atmos:run
Typesafe console is slowing down the runtime and sometimes can go crazy - just to be aware.
Tests
There are some BDD-style tests for parsing wiki markup. Writing regexp in Scala is easier than in Java (eg. better quoting of strings, you can avoid \\n within triple doublequotes etc.), but it is still not that clean and pure experience like in Perl. Shame.
Time gains and latency
It happened that even when using quite complex regexp for parsing article sections it was too fast to see how actors are optimising data flow. So I deliberately added 1ms delay in WikiParising.getSection to show these effects better.
Here are timings from WikiParserLinear (in microseconds, with a number of calls):
Exec time: 245.12 sec LongestArt: 106 (File:FireShot Screen Capture -002 - 'Bali – Travel guides at Wikivoyage' - en wikivoyage org wiki Bali.jpg), count: 49788, 49788 execTime: readXml-parseXml: 245083591, 1 xmlHasNext: 215568312, 4548859 parseArticle-2: 119644815, 49788 seePlaces: 118535075, 49788 getGeo-1: 549990, 18939 parseArticle-1: 434810, 49788 longestArticle: 245901, 49788 getGeo-2: 54741, 30849 readXml-FromFile: 40245, 1 seePlacesLength: 31297, 49788 [success] Total time: 265 s, completed 19-Jun-2014 00:29:31
and WikiParser (akka) with default actor settings, without any router configurations:
Exec time: 220.03 sec LongestArt: 0 (File:FireShot Screen Capture -002 - 'Bali – Travel guides at Wikivoyage' - en wikivoyage org wiki Bali.jpg), count: 49788, 24894 execTime: readXml-parseXml: 219988018, 1 seePlaces: 120414366, 49685 xmlHasNext: 92934186, 4548859 longestArticle: 1044310, 49788 parseArticle-1: 804609, 49788 getGeo-1: 535376, 18939 parseArticle-2: 363918, 49788 getGeo-2: 77243, 30849 readXml-FromFile: 44811, 1 seePlacesLength: 31267, 49685 count_parseArticle: 0, 49788 execTime: readXml-parseXml: 219988018, 1 seePlaces: 120704828, 49788 xmlHasNext: 92934186, 4548859 longestArticle: 1044310, 49788 parseArticle-1: 804609, 49788 getGeo-1: 535376, 18939 parseArticle-2: 363918, 49788 getGeo-2: 77243, 30849 readXml-FromFile: 44811, 1 seePlacesLength: 31297, 49788 count_parseArticle: 0, 49788
This is already faster, as for example *seePlaces* takes time when *readXml-parseXml* happens.
There are two dumps of counters as they are not exactly the same - there are still some messages being processed, like these in seePlaces (49685 / 49788) or seePlacesLength.
Tunning
Now, because we have some bottlenecks, we can do something about them. Let's just get more workers here:
val seePl = system.actorOf(Props[ArticleSeePlacesParser].withRouter(RoundRobinRouter(3)), "seePlaces")
How it changed timings?
Exec time: 91.51 sec LongestArt: 0 (File:FireShot Screen Capture -002 - 'Bali – Travel guides at Wikivoyage' - en wikivoyage org wiki Bali.jpg), count: 49788, 49788 execTime: seePlaces: 123174283, 49178 readXml-parseXml: 91491233, 1 xmlHasNext: 67099980, 4548859 longestArticle: 2419829, 49788 parseArticle-2: 647368, 49788 parseArticle-1: 609389, 49788 getGeo-1: 566737, 18939 getGeo-2: 80348, 30849 seePlacesLength: 31106, 49178 readXml-FromFile: 15315, 1 count_parseArticle: 0, 49788
This is almost 3x faster that linear version and 2.5x faster than with simple default Akka setup!
You can see, cumulative time of *seePlaces* is now longer than application run time, which depends on *readXml-parseXml* block.
Technically all I could do here (tiny i5 dual core) was to off-load anything from heavy StAX loop and get more hands for *seePlaces*. Perhaps with more cores, I could be able to "provide" xml/stax with a single core, but it is now a biggest bottleneck anyway.
Divide and conquer!
There is a room for further enhancements - I can do something about this linear stax loop, for example divide whole XML file into few chunks, where the split point should be between nearest </page> <page> elements and process them concurrently.
Check it
All this code is in github (akka, linear version). You should only need sbt to get them working.