开发者

Cassandra hector loader app runs out of memory

This simple app takes a comma delim file with headers and puts into Cassandra. It works for small file, however the memory just goes up until out of memory exception kills it.

What am I missing?

package com.company;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;

public class QuickLoad {
    public static Keyspace keyspace = null;
    public static void main(String[] args) {
        File file = new File(args[0]);
        String keyspaceName = args[1];
        String columnFamilyName = args[2];
        BufferedReader reader = null;
        try {
            keyspace = GetKeyspace(keyspaceName);
            reader = new BufferedReader(new FileReader(file));
            String fileLine = null;
            String[] headers = null;
            String[] fields = null;
            boolean headerLine = true;

            while ((fileLine = reader.readLine()) != null) {
                if (headerLine){
                    headerLine = false;
                    headers = fileLine.substring(1, fileLine.length()-1).split("\",\"");
                } else {
                    fields = fileLine.substring(1, fileLine.length()-1).split("\",\"");
                    CassandraSave(keyspace, columnFamilyName, headers, fields);
                }
            }
        }
        catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (reader != null) {
                    reader.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        System.exit(0);
    }

    public static void CassandraSave(Keyspace keyspace, String columnFamily, String[] headers, String[] columns) 
    {
        try 
        {
            Mutator mutator = HFactory.createMutator(keyspace, StringSerializer.get());
            for (int i = 1; i < headers.length-1; i++)
            {
                if ((columns[i] != null) || (!columns[i].equals("null"))) {
                    if (columns[i].length() > 0) {
                        HColumn<S开发者_StackOverflow社区tring, String> col = HFactory.createStringColumn(headers[i], columns[i]);
                        mutator.insert(columns[1], columnFamily, col);
                    }
                }
            }
            mutator.execute();
        } catch (Exception e){
            e.printStackTrace();
        }
    }

    public static Keyspace GetKeyspace(String keyspaceName)
    {
        String serverAddress = "localhost:9160";
        Cluster cluster = HFactory.getOrCreateCluster("My Cluster", serverAddress);
        Keyspace keyspace = HFactory.createKeyspace(keyspaceName, cluster);
        return keyspace;
    }

}


I could see this as a problem if one of your 'columns' in your input file was larger than your allocated heap. You may be able to fix this by putting an upper bound on the size of your mutation, s.t. your CassandraSave function only does 100 or so mutations in a single operation.


One instance of "com.ecyrd.speed4j.log.PeriodicalLog" loaded by "sun.misc.Launcher$AppClassLoader @ 0x899902f8" occupies 127,293,432 (99.62%) bytes. Keywords com.ecyrd.speed4j.log.PeriodicalLog sun.misc.Launcher$AppClassLoader @ 0x899902f8

Looks like you are using an older version of hector and running into a bug with speed4j leaking memory. If you upgrade to hector 0.8.0-2 it should be fixed.

One thing to note is that speed4j is disabled by default in 0.8.0-2, if you want to enable it see this thread.


Two things I see - it's single threaded and the batch size is pretty small.

Add an outside loop to collect inserts in the mutator with a batch size of about 500 rows to start and see how that goes. Here is an example of performant mutator insert I use for stress testing: https://github.com/zznate/cassandra-stress/blob/master/src/main/java/com/riptano/cassandra/stress/InsertCommand.java

Also, it is a bit older, but here is a gist of an approach to a parallelized loader that works similarly to what you describe: https://gist.github.com/397574

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜