how to avoid race condition when using Scala's Actor
I am writing a piece of code that would populate a mongoDB collection when the buffer (list) grow to a certain size.
import scala.actors.Actor
import com.mongodb.casbah.Imports._
import scala.collection.mutable.ListBuffer
class PopulateDB extends Actor {
val buffer = new ListBuffer[DBObject]
val mongoConn = MongoConnection()
val mongoCol = mongoConn("casbah_test")("logs")
def add(info: DBObject = null) {
if (info != null) buffer += info
if (buffer.size > 0 && (info == null || buffer.length >= 1000)) {
mongoCol.insert(buffer.toList)
buffer.clear
println("adding a batch")
}
}
def act() {
loop {
react {
case info: DBObject => add(info)
case msg if msg == "closeConnection" =>
println("Close connection")
add()
mongoConn.close
}
}
}
}
However, when I run the following code, scala will occasionally throw a "ConcurrentModificationException" on the "mongoCol.insert(buffer.toList)" line. I am pretty sure it has something to do with "mongoCol.insert". I am wondering if there is anything fundamentally wrong with the code. Or should I use something like the "atomic {...}" from Akka to avoid the issue.
Here's the complete stack trace:
PopulateDB@7e859a68: caught java.util.ConcurrentModificationException
java.util.ConcurrentModificationException
at java.util.LinkedHashMap$LinkedHashIterator.nextEntry(LinkedHashMap.java:373)
at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:392)
at java.util.LinkedHashMap$EntryIterator.next(LinkedHashMap.java:391)
at org.bson.BSONEncoder.putObject(BSONEncoder.java:113)
at org.bson.BSONEncoder.putObject(BSONEncoder.java:67)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:215)
at com.mongodb.DBApiLayer$MyCollection.insert(DBApiLayer.java:180)
at com.mongodb.DBCollection.insert(DBCollection.java:85)
at com.mongodb.casbah.MongoCollectionBase$class.insert(MongoCollection.scala:561)
at com.mongodb.casbah.MongoCollection.insert(MongoCollection.scala:864)
at PopulateDB.add(PopulateDB.scala:14)
at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:26)
at PopulateDB$$anonfun$act$1$$anonfun$apply$1.apply(PopulateDB.scala:25)
at scala.actors.ReactorTask.run(ReactorTask.scala:34)
at scala.actors.Reactor$class.resumeReceiver(Reactor.scala:129)
at PopulateDB.scala$actors$ReplyReactor$$super$resumeReceiver(PopulateDB.scala:5)
at scala.actors.ReplyReactor$class.resumeReceiver(ReplyReactor.scala:69)
at PopulateDB.resumeReceiver(PopulateDB.scala:5)
at scala.actors.Actor$class.searchMailbox(Actor.scala:478)
at PopulateDB.searchMailbox(PopulateDB.scala:5)
at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
at scala.actors.Reactor$$anonfun$startSearch$1$$anonfun$apply$mcV$sp$1.apply(Reactor.scala:114)
at scala.actors.ReactorTask.run(ReactorTask.scala:36)
at scala.concurrent.forkjoin.ForkJoinPool$AdaptedRunnable.exec(ForkJoinPool.java:611)
at scala.concurrent.forkjoin.ForkJoinTask.quietlyExec(ForkJoinTask.java:422)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.mainLoop(ForkJ开发者_开发百科oinWorkerThread.java:340)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:325)
Thanks, Derek
DBObject is not thread safe; you're sending a DBObject in with your actor message. It is likely being modified again later which is going to cause that concurrent modification problem.
I would suggest to start with, trying to use clone()
on the DBObject as it comes into the actor, and put that into your buffer. It is only a shallow copy but should at least be enough to cause concurrent modification problems on the LinkedHashMap which backs the keys on DBObject ( which is kept ordered, by virtue of the LHM).
I'd try:
def act() {
loop {
react {
case info: DBObject => add(info.clone())
case msg if msg == "closeConnection" =>
println("Close connection")
add()
mongoConn.close
}
}
}
If that doesn't work, look at anywhere else you are modifying the DBObject after it is sent to the Actor.
Why class
below?
class PopulateDB extends Actor
Do you keep multiple PupulateDB
actors? I'd expect object PopulateDB extends Actor
, so that a single actor would concentrate this task.
Aside from that, the problem seems to be inside casbah or mongodb itself.
精彩评论