开发者

Uploading large gzipped data files to HDFS

I have a use case where I want to upload big gzipped text data files (~ 60 GB) on HDFS.

My code below is taking about 2 hours to upload these files in chunks of 500 MB. Following is the pseudo code. I was chekcing if somebody could help me reduce this time:

i) int fileFetchBuffer = 500000000; System.out.println("file fetch buffer is: " + fileFetchBuffer); int offset = 0; int bytesRead = -1;

    try {
        fileStream = new FileInputStream (file);    
        if (fileName.endsWith(".gz")) {
            stream = new GZIPInputStream(fileStream);

            BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); 

            String[] fileN = fileName.split("\\.");
            System.out.println("fil 0 : " + fileN[0]);
            System.out.println("fil 1 : " + fileN[1]);
            //logger.info("First line is: " + streamBuff.readLine());

            byte[] buffer = new byte[fileFetchBuffer];

            FileSystem fs = FileSystem.get(conf);

            int charsLeft = fileFetchBuffer;
            while (true) {

                charsLeft = fileFetchBuffer;    



             logger.info("charsLeft outside while: " + charsLeft);

          FSDataOutputStream dos = null;
                while (charsLeft != 0) {
                    bytesRead = stream.read(buffer, 0, charsLeft);
                    if (bytesRead < 0) {
                        dos.flush();
                        dos.close();
                        break;
                    }
                    offset = offset + bytesRead;
                    charsLeft = charsLeft - bytesRead; 
  开发者_开发技巧                  logger.info("offset in record: " + offset);
                    logger.info("charsLeft: " + charsLeft);
                    logger.info("bytesRead in record: " + bytesRead);
                    //prettyPrintHex(buffer);

                    String outFileStr = Utils.getOutputFileName(
                            stagingDir,
                            fileN[0],
                            outFileNum);

                    if (dos == null) {
                    Path outFile = new Path(outFileStr);
                    if (fs.exists(outFile)) {
                        fs.delete(outFile, false);
                    }

                    dos = fs.create(outFile);
                    }

                    dos.write(buffer, 0, bytesRead);


                } 

                logger.info("done writing: " + outFileNum);
                dos.flush();
                dos.close();

                if (bytesRead < 0) {
                    dos.flush();
                    dos.close();
                    break;
                }

                outFileNum++;

            }  // end of if


        } else {
            // Assume uncompressed file
            stream = fileStream;
        }           

    } catch(FileNotFoundException e) {
        logger.error("File not found" + e);
    }


You should consider using the super package IO from Apache.

It has a method

IOUtils.copy( InputStream, OutputStream )

that would tremendously reduce time needed to copy your files.


I tried with buffered input stream and saw no real difference. I suppose a file channel implementation could be even more efficient. Tell me if it's not fast enough.

package toto;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class Slicer {

    private static final int BUFFER_SIZE = 50000;

    public static void main(String[] args) {

        try 
        {
            slice( args[ 0 ], args[ 1 ], Long.parseLong( args[2]) );
        }//try
        catch (IOException e) 
        {
            e.printStackTrace();
        }//catch
        catch( Exception ex )
        {
            ex.printStackTrace();
            System.out.println( "Usage :  toto.Slicer <big file> <chunk name radix > <chunks size>" );
        }//catch
    }//met

    /**
     * Slices a huge files in chunks.
     * @param inputFileName the big file to slice.
     * @param outputFileRadix the base name of slices generated by the slicer. All slices will then be numbered outputFileRadix0,outputFileRadix1,outputFileRadix2...
     * @param chunkSize the size of chunks in bytes
     * @return the number of slices.
     */
    public static int slice( String inputFileName, String outputFileRadix, long chunkSize ) throws IOException
    {
        //I would had some code to pretty print the output file names
        //I mean adding a couple of 0 before chunkNumber in output file name
        //so that they all have same number of chars
        //use java.io.File for that, estimate number of chunks, take power of 10, got number of leading 0s

        //just to get some stats
        long timeStart = System.currentTimeMillis();
        long timeStartSlice = timeStart;
        long timeEnd = 0;

        //io streams and chunk counter
        int chunkNumber = 0;
        FileInputStream fis = null;
        FileOutputStream fos = null;

        try 
        {
            //open files
            fis = new FileInputStream( inputFileName );
            fos = new FileOutputStream( outputFileRadix + chunkNumber );

            //declare state variables
            boolean finished = false;
            byte[] buffer = new byte[ BUFFER_SIZE ];
            int bytesRead = 0;
            long bytesInChunk = 0;


            while( !finished )
            {
                //System.out.println( "bytes to read " +(int)Math.min( BUFFER_SIZE, chunkSize - bytesInChunk ) );
                bytesRead = fis.read( buffer,0, (int)Math.min( BUFFER_SIZE, chunkSize - bytesInChunk ) );

                if( bytesRead == -1 )
                    finished = true;
                else
                {
                                            fos.write( buffer, 0, bytesRead );
                    bytesInChunk += bytesRead;
                    if( bytesInChunk == chunkSize )
                    {
                        if( fos != null )
                        {
                            fos.close();
                            timeEnd = System.currentTimeMillis();
                            System.out.println( "Chunk "+chunkNumber + " has been generated in "+ (timeEnd - timeStartSlice) +" ms");
                            chunkNumber ++;
                            bytesInChunk = 0;
                            timeStartSlice = timeEnd;
                            System.out.println( "Creating slice number " + chunkNumber );
                            fos = new FileOutputStream( outputFileRadix + chunkNumber );
                        }//if
                    }//if
                }//else
            }//while
        }
        catch (Exception e) 
        {
            System.out.println( "A problem occured during slicing : " );
            e.printStackTrace();
        }//catch
        finally 
        {
            //whatever happens close all files
            System.out.println( "Closing all files.");
            if( fis != null )
                fis.close();
            if( fos != null )
                fos.close();
        }//fin

        timeEnd = System.currentTimeMillis();
        System.out.println( "Total slicing time : " + (timeEnd - timeStart) +" ms" );
        System.out.println( "Total number of slices "+ (chunkNumber +1) );

        return chunkNumber+1;
    }//met
}//class

Greetings, Stéphane

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜