开发者

Scala memory leak using pull parser

I've been attempting to write an XML parser to read through a Wikipedia XML dump (the english language, current revisions only, about 6.2Gb bzipped) and have been using the Scala 2.8.1 pull parser.It gets a reasonable way through (3 million of over 10 million articles) but seems to be leaking memory gradually and eventually bombs with an out of heap error. I bumped the heap up to 1.5Gb and it got further (almost to the end), but then I got (I forget the exact exception) an error indicating that the garbage collector was giving up (spending a large proportion of the overall processing resource without reclaiming much).

My code seems reasonable to me (although it's not idiomatic functional scala yet) and I can't see any obvious source of leaks. I'm also aware that the pull parser is still being refined - but am far too aware of my own ignorance to call this a library problem. I'm an experienced C++ and Python programmer, but am just getting in to Scala so would appreciate any feedback.

import java.io.{FileInputStream, BufferedInputStream}
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream
import org.apache.hadoop.io.SequenceFile.{createWriter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.SequenceFile.CompressionType.BLOCK

import scala.io.Source
import scala.xml.pull.{XMLEventReader, EvElemStart, EvElemEnd, EvText}



object Crunch
{
    private def parsePage( parser : XMLEventReader ) : (String, Long, Long, String) =
    {
        var title = ""
        var id = 0
        var revision = 0
        var text = ""
        var done = false
        while ( parser.hasNext && !done )
        {
            parser.next match
            {
                case EvElemStart(_, "title", _, _ ) =>
                {
                    title = getText( parser, "title" )
                }
                /*case EvElemStart(_, "revision", _, _) =>
                {
                    // Need to get the 'id' from revision
                    revision = getText( parser, "revision" ).toInt
                }*/
                case EvElemStart(_, "id", _, _ ) =>
                {
                    id = getText( parser, "id" ).toInt
                }
                case EvElemStart(_, "text", _, _ ) =>
                {
                    text = getText( parser, "text" )
                }
                case EvElemEnd(_, "page") =>
                {
                    done = true
                }
                case _ =>
            }
        }
        return (title, id, revision, text)
    }

    private def getText( parser : XMLEventReader, inTag : String ) : String =
    {
        var fullText = new StringBuffer()
        var done = false
        while ( parser.hasNext && !done )
        {
            parser.next match
            {
                case EvElemEnd(_, tagName ) =>
                {
                    assert( tagName.equalsIgnoreCase(inTag) )
                    done = true
                }
                case EvText( text ) =>
                {
                    fullText.append( text )
                }
                case _ =>
            }
        }
        return fullText.toString()
    }
    def main( args : Array[String] )
    {
        require( args.length == 2 )
        val fin = new FileInputStream( args(0) )
        val in = new BufferedInputStream(fin)
        val decompressor = new BZip2CompressorInputStream(in)

        val runtime = Runtime.getRuntime

        val conf = new Configuration()
        val fs = FileSystem.get(conf)        

        //val writer = createWriter( fs, conf, new Path(args(1)), new Text().getClass(), new Text().getClass(), BLOCK )

        var count = 0
        try
        {
            val source = Source.fromInputStream( decompressor )
            val parser = new XMLEventReader(source)

            while (parser.hasNext)
            {
                parser.next match
                {
                    case EvElemStart(_, "page", attrs, _) =>
                    {
                        val (title, id, revision, text) = parsePage( parser )

                        //writer.append(开发者_运维百科 new Text(title), new Text(text) )

                        count = count + 1
                        if ( count % 100 == 0 )
                        {
                            printf("%s %d (%dMb mem, %dMb free)\n", title, count,
                                (runtime.totalMemory/1024/1024).toInt,
                                (runtime.freeMemory/1024/1024).toInt )
                        }
                    }
                    case _ =>
                }
                // Do something
            }
        }
        finally
        {
            decompressor.close()
            fin.close()
        }

        println( "Finished decompression.")
    }
}


There were 2 types of memory issues with XML pull parser that have been fixed in trunk:

  1. CData and processing instruction elements preventing garbage collection
  2. Elements with a lot of children, each child takes a bit of memory, eventually the heap exhausts.

The first issue typically tends to cause very quick out of memory issue so it's unlikely that.

Both should be fixed 2.9.0 nightly and I would recommend using it. If you're running in 2.9.0 issues because it's the trunk and may not be stable, you can also backport the two patches by downloading and compiling locally XMLEventEventReader and MarkupParser, then package output as a 00patch.jar so that it comes before the scala libs jar and drop it under $SCALA_HOME/lib of your 2.8.1 install.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜