Thursday 19 June 2014

Parallel XML processing with Scala & Akka - 3x faster!

I wanted to check how easy or hard it is to optimise XML processing with parallel/concurrent approach. I have some experience doing it with e.g. Java/Spring (as much boring as complicated), but I wanted to see how easy it could be to use a JVM Scala and Akka, an actor-based toolkit for that kind of ETL data transformations.

I prepared 2 projects that are doing exactly the same thing - processing Wiki-voyage XML and wiki-markup file and getting some statistics about it.
git clone https://github.com/codesurf42/wikiParser.git
git clone https://github.com/codesurf42/wikiParserLinear.git

The first one is based on Akka actors for concurrent processing and Akka agents for counting data, another one is a reference solution (linear processing) to compare efficiency.
There are also simple metrics counting execution time and number of calls - I found them very useful to check consistency across both solutions too.

Think asynchronously



XML is processed in a StAX way, this is a linear loop over the input xml stream and due to its nature (keeping current state) that loop has to be single.

Actors


The good idea is to keep them simple, one type of job per actor - that way it is simple to tune system by creating pool of actors when the stage is significantly slow than others.

Agents


They are used to properly count single values across actors/threads. They can be very simple like a counter:

Parser.agentCount.send(_ + 1)

(the simple var count "in actor" is actually counting, how many messages a single actor processed).

They can also take a function to compute, like for a longest article title:

 
// we can just compare which one is the longest
Parser.agentMaxArtTitle.send(current =>
  if (current.length < e.title.length)
    e.title
  else
    current
)




Metrics


There are also some in-project metrics gathered by using actors too - it is very important to see where the time is spend and where we have bottlenecks to prioritise our effort on improving it. They can also help to check consistency across different solutions - simply we should get the same numbers.

Typesafe console


This is a nice tool for profiling your Akka app. You can watch real-time numbers of messages processed by actors, latency in queues and more. This project is down-versioned to the latest Typesafe console version - to start it with this monitoring tool, simply use:

sbt atmos:run

Typesafe console is slowing down the runtime and sometimes can go crazy - just to be aware.

Tests


There are some BDD-style tests for parsing wiki markup. Writing regexp in Scala is easier than in Java (eg. better quoting of strings, you can avoid \\n within triple doublequotes etc.), but it is still not that clean and pure experience like in Perl. Shame.

Time gains and latency


It happened that even when using quite complex regexp for parsing article sections it was too fast to see how actors are optimising data flow. So I deliberately added 1ms delay in WikiParising.getSection to show these effects better.

Here are timings from WikiParserLinear (in microseconds, with a number of calls):

Exec time: 245.12 sec
LongestArt: 106 (File:FireShot Screen Capture -002 - 'Bali – Travel guides at Wikivoyage' - en wikivoyage org wiki Bali.jpg), count: 49788, 49788
execTime:
         readXml-parseXml:    245083591,          1
               xmlHasNext:    215568312,    4548859
           parseArticle-2:    119644815,      49788
                seePlaces:    118535075,      49788
                 getGeo-1:       549990,      18939
           parseArticle-1:       434810,      49788
           longestArticle:       245901,      49788
                 getGeo-2:        54741,      30849
         readXml-FromFile:        40245,          1
          seePlacesLength:        31297,      49788
[success] Total time: 265 s, completed 19-Jun-2014 00:29:31

and WikiParser (akka) with default actor settings, without any router configurations:

Exec time: 220.03 sec
LongestArt: 0 (File:FireShot Screen Capture -002 - 'Bali – Travel guides at Wikivoyage' - en wikivoyage org wiki Bali.jpg), count: 49788, 24894
execTime:
         readXml-parseXml:    219988018,          1
                seePlaces:    120414366,      49685
               xmlHasNext:     92934186,    4548859
           longestArticle:      1044310,      49788
           parseArticle-1:       804609,      49788
                 getGeo-1:       535376,      18939
           parseArticle-2:       363918,      49788
                 getGeo-2:        77243,      30849
         readXml-FromFile:        44811,          1
          seePlacesLength:        31267,      49685
       count_parseArticle:            0,      49788
execTime:
         readXml-parseXml:    219988018,          1
                seePlaces:    120704828,      49788
               xmlHasNext:     92934186,    4548859
           longestArticle:      1044310,      49788
           parseArticle-1:       804609,      49788
                 getGeo-1:       535376,      18939
           parseArticle-2:       363918,      49788
                 getGeo-2:        77243,      30849
         readXml-FromFile:        44811,          1
          seePlacesLength:        31297,      49788
       count_parseArticle:            0,      49788

This is already faster, as for example *seePlaces* takes time when *readXml-parseXml* happens.
There are two dumps of counters as they are not exactly the same - there are still some messages being processed, like these in seePlaces (49685 / 49788) or seePlacesLength.

Tunning


Now, because we have some bottlenecks, we can do something about them. Let's just get more workers here:

val seePl = system.actorOf(Props[ArticleSeePlacesParser].withRouter(RoundRobinRouter(3)), "seePlaces")


How it changed timings?

Exec time: 91.51 sec
LongestArt: 0 (File:FireShot Screen Capture -002 - 'Bali – Travel guides at Wikivoyage' - en wikivoyage org wiki Bali.jpg), count: 49788, 49788
execTime:
                seePlaces:    123174283,      49178
         readXml-parseXml:     91491233,          1
               xmlHasNext:     67099980,    4548859
           longestArticle:      2419829,      49788
           parseArticle-2:       647368,      49788
           parseArticle-1:       609389,      49788
                 getGeo-1:       566737,      18939
                 getGeo-2:        80348,      30849
          seePlacesLength:        31106,      49178
         readXml-FromFile:        15315,          1
       count_parseArticle:            0,      49788

This is almost 3x faster that linear version and 2.5x faster than with simple default Akka setup!

You can see, cumulative time of *seePlaces* is now longer than application run time, which depends on *readXml-parseXml* block.
Technically all I could do here (tiny i5 dual core) was to off-load anything from heavy StAX loop and get more hands for *seePlaces*. Perhaps with more cores, I could be able to "provide" xml/stax with a single core, but it is now a biggest bottleneck anyway.

Divide and conquer!


There is a room for further enhancements - I can do something about this linear stax loop, for example divide whole XML file into few chunks, where the split point should be between nearest &lt;/page> &lt;page> elements and process them concurrently.
On bigger machines / clusters there will be more questions: How big should be a pool of specific actors? Auto-resized? How should it change over the execution time, what strategies can we use? Somehow it should depend on queues, latencies and number of waiting messages and priorities for specific tasks/actors (e.g. metrics can wait longer) and adopting routing strategies to it - so we can react to backlogs of messages waiting to be processed. And so on...

Check it


All this code is in github (akka, linear version). You should only need sbt to get them working.

3 comments:

Lumir said...

Hello,

I try to use Scala 2.11.1 but I am not successful. Have you ever try this version of Scala?

Thank you for the post!

Cheers,
Lumir

Adam said...

Hi Lumir,
Yes, it should work. There is a small difference for XML, which was moved to ext lib between 2.10-2.11.
I put the changed version into a branch "scala_2.11"
https://github.com/codesurf42/wikiParser/tree/scala_2.11
(the only difference is in build.sbt)

Just be aware that typesafe akka-monitor may not work - at the time of writing it was ready only for Scala 2.10 and I am not sure if they have libs for 2.11 already - that's why I downgraded this project to 2.10 to have more metrics.

Cheers,
Adam

Lumir said...

Hi Adam,

you are completely right. After I removed the akka-monitor form the plugins, I have no problem. The buildsbt I modified alredy before.

Thank you for the quick answer.

Lumir