Better living through parallelism

Judging by the interest to my last actors post, I thought I’d throw up a piece of code that uses actors anonymously to parallelise a long running operation. Not every operation can be parallelised, most things we work on tend to be fairly sequential. However, sometimes if you can split up the work to perform an operation, you can get some serious bang for your buck.

Let’s take a pretty interesting little problem as an example, working out pi. I first encountered this problem in a job interview a while back (yeah, I thought it was out there too). It’s an example of what’s known as a Monte Carlo Simulation. The general way to work this out is described in detail in this Google Code University tutorial on parallel programming (worth taking a look).

The genaral approach works on the basis of this diagram.

The steps are as follows:

  1. generate points randomly in the square
  2. work out whether each point falls into the circle (distance from centre is less than the radius, use Pythagoras here)
  3. the proportion of points in the circle to those in the square is approximately the same as that of the areas of the two shapes; transpose the area formulas (I’ll show it in the code) to work out an approximate value of pi

It’s a nice little problem, because the more points you generate, the better the estimate gets. It’s also easily parallelisable as the first two steps can be performed repeatedly by anyone. Let’s do a first pass in a single thread:

package net.jakubkorab.pi_evaluator

import scala.actors._
import scala.actors.Actor._
import scala.math._

object PiEvaluator {
	val sideLength : Double = 1
	val radius : Double = sideLength / 2 
	val totalPointsToSample : Long = 10000

	def isRandomPointWithinCircleRadius() : Boolean = {
		val x = (random * sideLength) - radius
		val y = (random * sideLength) - radius
		val hypotenuse = sqrt(pow(abs(x), 2) + pow(abs(y), 2)) // pythagoras
		hypotenuse <= radius
	}
	
	def pointsInCircle(pointsToSample : Long) : Long = { 
		(1L to pointsToSample)
			.map((i : Long) => isRandomPointWithinCircleRadius())
			.foldLeft(0L) { (pointCount : Long, pointInCircle : Boolean) => 
				if (pointInCircle) pointCount + 1 else pointCount
			}
	}

	def approximatePi(pointsEvaluated : Long, pointsInCircle : Long) = {
		println("After " + pointsEvaluated 
			+ " samples, the number of points in a circle was " + pointsInCircle)
		
		val areaOfSquare = sideLength * sideLength
		// areaOfCircle/areaOfSquare =~ pointsInCircle/pointsEvaluated, so
		val areaOfCircle = pointsInCircle * areaOfSquare / pointsEvaluated
		// areaOfCircle = pi * r^2, so
		val approximatePi = areaOfCircle / pow(radius, 2)
		println("Pi is approx: " + approximatePi) 
	}

	def main(args : Array[String]) = {
		approximatePi(totalPointsToSample, pointsInCircle(totalPointsToSample))
	}
}

At line 20, there’s a nice little bit of map-reduce action going on (fold is an equivalent name). What’s going on is that for each number, we’re generating a random number an determining whether it’s in a circle. Then we fold/reduce the list of booleans returned by the map function to give a count of all the points that were generated within the radius of the circle. It’s a super useful little programming construct.

We then call approximatePi() to do our maths for us.

Running this (I’m doing it through SBT, that’s why the [Info] outputs) we get:

After 10000 samples, the number of points in a circle was 7883
Pi is approx: 3.1532
[info] == run ==
[success] Successful.
[info]
[info] Total time: 5 s, completed 30-Jan-2011 13:45:46

Not bad, but we need more points to get a better result. We know that pi should be around 3.14159, but we’re not quite there yet. Scaling up our numbers:

Samples Time (s) Pi
10,000 5 3.1532
100,000 5 3.13676
1,000,000 7 3.141188
10,000,000 20 3.1424864
100,000,000 189 3.14139356

Ok, we’re approaching our known value, but this approach has shown some stress. It’s not scaling too well. It’s slowing right down, but my processor (Intel Core2 Duo T9300 @ 2.5GHz) is showing only ~60% usage. If we could break down the point generation among a number of threads, and then tally up the result at the end, we could make better use of our hardware. Enter fork-join.

Fork-join is probably one of the most common things that you’d want to do with concurrency. It’s a bit of a pain in the butt in Java (task executor framework, futures etc.). It really shouldn’t be. There’s some excellent work going on around a framework for this, and map-reduce too, for future versions of the language. In the meantime, you need to jump through hoops – cue fishing through Concurrency in Practice. In Scala though, it’s really easy using actors. So we rewrite main:

	val workers = 10
	def main(args : Array[String]) = {
		// break down the job between worker - there's a rounding issue here, but never mind
		val samplesPerWorker = totalPointsToSample / workers 
		1 to workers foreach { (workerNumber : Int) => 
			val worker = actor {
				receive {
					case pointsToEvaluate : Long => { 
						sender ! (pointsToEvaluate, pointsInCircle(pointsToEvaluate)) // reply with a tuple
					} 
				}
			}
			worker ! samplesPerWorker 
		}
		
		var workersReplied = 0
		var totalPointsEvaluated = 0L
		var totalPointsInCircle = 0L
		while (workersReplied < workers) {
			receive {
				case (pointsEvaluated : Long, pointsInCircle : Long) => {
					workersReplied += 1
					totalPointsEvaluated += pointsEvaluated
					totalPointsInCircle += pointsInCircle
					if (workersReplied % 100 == 0) {
						println(workersReplied + " workers replied")
					}
				}
			}
		}
		approximatePi(totalPointsEvaluated, totalPointsInCircle)
	}
Samples Workers Time (s) Pi
10,000,000 10 18 3.1427624
100,000,000 10 64 3.1418042

64s is a lot better than our original 189s! And this effect just gets more pronounced the more processors you have (where the optimal number of actors matches the number of processors; at least it should be – feel free to play around with this).

In the second version of main, the actor keyword/block is actually a method on the Actor object that creates an anonymous actor instance and instantly starts it. We ping some messages to it, and wait for the results using receive().

Easy, and you can apply it for a ton of different tasks.


Posted

in

, ,

by

Tags:

Comments

2 responses to “Better living through parallelism”

  1. Tomás Lázaro Avatar
    Tomás Lázaro

    Hi, nice post showing how to use Actors!

    One thing though, and I don’t want to be an asshole, I’m sure you did to be ilustrative rather than hackish but:

    val hypotenuse = sqrt(pow(abs(x), 2) + pow(abs(y), 2)) // pythagoras
    hypotenuse <= radius

    could be made much faster as:

    x * x + y * y <= radius * radius // radius square should better be saved as a val but compiler should still optimize it anyway

  2. Frans van Buul Avatar

    Hi, Jakub, interesting post!

    I ran your test program on Core i7 740QM processor. I used Scala 2.8.1. I used System.currentTimeMillis for performance measurement. I fixed the number of points to 100.000.000, and varied the workers. The resulting pi estimates are pi +/- 0.0002.

    The performance surprised me, though:

    single: 12 s
    1 worker: 12 s
    2 workers: 17 s
    3 workers: 22 s
    6 workers: 22 s
    10 workers: 23 s
    20 workers: 23 s
    50 workers: 23 s

    While I added more workers, CPU load increased (more cores got used), but actual performance didn’t increase! So something’s not scaling well. Any ideas?

    On a side note, the implementation with map + foldLeft is cool from an FP perspective, but also takes a lot more memory than is strictly required.

    Kind regards,
    Frans van Buul