开发者

Preferred way of processing this data with parallel arrays

Imagine a sequence of java.io.File objects. The sequence is not in any particular order, it gets populated after a directory traversal. The names of the files can be like this:

/some/file.bin
/some/other_file_x1.bin
/some/other_file_x2.bin
/some/other_file_x3.bin
/some/other_file_x4.bin
/some/other_file_x5.bin
...
/some/x_file_part1.bin
/some/x_file_part2.bin
/some/x_file_part3.bin
/some/x_file_part4.bin
/some/x_file_part5.bin
...
/some/x_file_part10.bin

Basically, I can have 3 types of files. First type is the simple ones, which only have a .bin extension. The second type of file is the one formed from _x1.bin till _x5.bin. And the third type of file can be formed of 10 smaller parts, from _part1 till _part10. I know the naming may be strange, but this is what I hav开发者_如何学编程e to work with :)

I want to group the files together ( all the pieces of a file should be processed together ), and I was thinking of using parallel arrays to do this. The thing I'm not sure about is how can I perform the reduce/acumulation part, since all the threads will be working on the same array.

val allBinFiles = allBins.toArray // array of java.io.File

I was thinking of handling something like that:

val mapAcumulator = java.util.Collections.synchronizedMap[String,ListBuffer[File]](new java.util.HashMap[String,ListBuffer[File]]())

allBinFiles.par.foreach { file =>
   file match {
      // for something like /some/x_file_x4.bin nameTillPart will be /some/x_file
      case ComposedOf5Name(nameTillPart) => {
          mapAcumulator.getOrElseUpdate(nameTillPart,new ListBuffer[File]()) += file
      }
      case ComposedOf10Name(nameTillPart) => {
          mapAcumulator.getOrElseUpdate(nameTillPart,new ListBuffer[File]()) += file
      }
      // simple file, without any pieces
      case _ => {
          mapAcumulator.getOrElseUpdate(file.toString,new ListBuffer[File]()) += file
      }
   }
}

I was thinking of doing it like I've shown in the above code. Having extractors for the files, and using part of the path as key in the map. Like for example, /some/x_file can hold as values /some/x_file_x1.bin to /some/x_file_x5.bin. I also think there could be a better way of handling this. I would be interested in your opinions.


The alternative is to use groupBy:

val mp = allBinFiles.par.groupBy {
  case ComposedOf5Name(x) => x
  case ComposedOf10Name(x) => x
  case f => f.toString
}

This will return a parallel map of parallel arrays of files (ParMap[String, ParArray[File]]). If you want a sequential map of sequential sequences of files from this point:

val sqmp = mp.map(_.seq).seq

To ensure that the parallelism kicks in, make sure you have enough elements in you parallel array (10k+).

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜