开发者

Need explanation of transferring binary data using Thrift rpc

Lets say I defined following Thrift service

service FileResource {      
binary get_file(1:string file_name)
}

Here is the generated implementation which I cannot understand

public ByteBuffer recv_get_file() throws org.apache.thrift.TException
{
  org.apache.thrift.protocol.TMessage msg = iprot_.readMessageBegin();
  if (msg.type == org.apache.thrift.protocol.TMessageType.EXCEPTION) {
    org.apache.thrift.TApplicationException x = org.apache.thrift.TApplicationException.read(iprot_);
    iprot_.readMessageEnd();
    throw x;
  }
  if (msg.seqid != seqid_) {
    throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.BAD_SEQUENCE_ID, "get_file failed: out of sequence response");
  }
  get_file_result result = new get_file_result();
  result.read(iprot_);
  iprot_.readMessageEnd();
  if (result.isSetSuccess()) {
    return result.success;
  }
  throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "get_file failed: unkno开发者_StackOverflowwn result");
}

How works the string

 result.read(iprot_);

? Is it synchronous or asynchronous? How it will work for large data (several megabytes and more)? And what I need to read those data? Unfortunately I'm not used to work with java.nio and ByteBuffer. Any examples or guides would be nice.


I think you misunderstood what Apache Thrift is for. If it was this complicated, Java NIO would be easier...

How it will work for large data (several megabytes and more)?

Thrift should care of transporting that data for you. How's the performance? This will highly depend on your hardware and the quality of the network. Thrift has a pretty good performance.

And what I need to read those data?

In your Java Thrift client, you can do

TTransport transport;
transport = new TSocket("yourServerHostNameOrIPAddress", serverPort);
transport.open();
TProtocol protocol = new TBinaryProtocol(transport);
ChunkFileResourceThrift.Client client = new ChunkFileResourceThrift.Client(protocol);
ByteBuffer buffer = client.get_file(yourFileName);
// Do whatever you want with the byte buffer
transport.close();

Is it synchronous or asynchronous?

If you defined it as oneway in the .thrift file, it's asynchronous, otherwise it is synchronous. Thus in your case it is synchronous.

Having to implement network low-level details totally beats the purpose of using Thrift. Thrift is precisely used so you can forget about this details.


Finally I succeeded in transferring file from server to client. I extended Client and Processor classes auto-generated by Thrift. It gave me access to TProtocol object. Which in turn allows to send/receive arbitrary data streams.
I'm sure my solution is very rough. It would be nice if someone pointed to me how to implement it in conformity with Thrift architecture. Could it be accomplished better by implementing custom Thrift protocol?

client:

package alehro.droid;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

import alehro.log.Logger;
import alehro.tcp.ChunkFileResourceThrift;
import alehro.tcp.ServerSideError;

class ThriftClientExt extends ChunkFileResourceThrift.Client {
public ThriftClientExt(TProtocol prot) {
    super(prot);

}

public void recv_get_file_ext(String get_file_out_path) throws TException,
        IOException, ServerSideError {
    FileOutputStream fos = new FileOutputStream(get_file_out_path);
    FileChannel channel = fos.getChannel();
    int size = 0;
    // -1 - end of file, -2 exception.
    while ((size = iprot_.readI32()) > 0) {
        Logger.me.v("receiving buffer size=" + size);
        ByteBuffer out = iprot_.readBinary();
        // out.flip();
        channel.write(out);
    }
    if (size == -2) {
        String msg = iprot_.readString();
        Logger.me.e("Server error: " + msg);
        // TODO: report error to user
    }
    channel.close();
    recv_get_file();
}

}

server:

package alehro.tcp;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;

import alehro.log.Logger;
import alehro.tcp.ChunkFileResourceThrift.Iface;
import alehro.tcp.ChunkFileResourceThrift.get_file_args;
import alehro.tcp.ChunkFileResourceThrift.get_file_result;

public class ChunkedFileResourceProcessor extends
    ChunkFileResourceThrift.Processor {

public interface IfaceExt extends Iface {
    void get_file_raw(String key, String file_name, TProtocol out)
            throws TException, ServerSideError;
}

final private IfaceExt iface_1;


public ChunkedFileResourceProcessor(IfaceExt iface) {
    super(iface);
    iface_1 = iface;
    // replace generated implementation by my custom one.
    processMap_.put("get_file", new get_file_raw());
}

private class get_file_raw implements ProcessFunction {

    @Override
    public void process(int seqid, TProtocol iprot, TProtocol oprot)
            throws TException {
        get_file_args args = new get_file_args();
        try {
            args.read(iprot);
        } catch (org.apache.thrift.protocol.TProtocolException e) {
            iprot.readMessageEnd();
            org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(
                    org.apache.thrift.TApplicationException.PROTOCOL_ERROR,
                    e.getMessage());
            oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                    "get_file",
                    org.apache.thrift.protocol.TMessageType.EXCEPTION,
                    seqid));
            x.write(oprot);
            oprot.writeMessageEnd();
            oprot.getTransport().flush();
            return;
        }
        iprot.readMessageEnd();
        get_file_result result = new get_file_result();
        try {
            iface_1.get_file_raw(args.key, args.file_name, oprot);
        } catch (ServerSideError ouch) {
            result.ouch = ouch;
        } catch (Throwable th) {
            Logger.me.e("Internal error processing get_file_raw");
            Logger.me.e(th.getMessage());
            Logger.me.e(th);
            org.apache.thrift.TApplicationException x = new org.apache.thrift.TApplicationException(
                    org.apache.thrift.TApplicationException.INTERNAL_ERROR,
                    "Internal error processing get_file");
            oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                    "get_file",
                    org.apache.thrift.protocol.TMessageType.EXCEPTION,
                    seqid));
            x.write(oprot);
            oprot.writeMessageEnd();
            oprot.getTransport().flush();
            return;
        }
        oprot.writeMessageBegin(new org.apache.thrift.protocol.TMessage(
                "get_file", org.apache.thrift.protocol.TMessageType.REPLY,
                seqid));
        result.write(oprot);
        oprot.writeMessageEnd();
        oprot.getTransport().flush();
    }

}

}

server handler:

public class ChunkedFileResourceHandler implements
    ChunkedFileResourceProcessor.IfaceExt {
....
@Override
public void get_file(String key, String file_name) throws TException {
    // stub
    throw new TException("Wrong call. Use get_file_raw instead.");
}

@Override
public void get_file_raw(String key, String file_name, final TProtocol out)
        throws ServerSideError, TException {
    // catch all here. mimic original get_file throw politics.
    try {
        Logger.me.v("Begin get_file_raw");
        UserSession se = accessUserSession(key, "get", 0, 0);
        vali(se != null);
        synchronized (se) {
            String fullPath = "";
            Logger.me.i("get file start: " + file_name);
            String userDir = AppConfig.getUserDir(se.info.email);
            fullPath = userDir + file_name;

            final FileInputStream inputFile;
            ByteBuffer buffer = null;
            int bytesRead = -1;
            FileChannel fileChannel = null;

            inputFile = new FileInputStream(fullPath);
            fileChannel = inputFile.getChannel();
            buffer = ByteBuffer.allocate(2048);
            bytesRead = fileChannel.read(buffer);

            // Logger.me.v("start sending file");
            while (bytesRead != -1) {
                buffer.flip();
                int length = buffer.limit() - buffer.position()
                        - buffer.arrayOffset();
                Logger.me.v("sending buffer length=" + length);

                out.writeI32(length); // read it in client
                out.writeBinary(buffer); // read it in client
                buffer.clear();

                bytesRead = fileChannel.read(buffer);

            }
            out.writeI32(-1); // read it in client

            Logger.me.i("get file end.");
        }
    } catch (TException e) {
        throw e;
    } catch (Throwable e) {
        write_get_file_exception(file_name, e, out);
        return;
    }

}

void write_get_file_exception(String file, Throwable e, final TProtocol out)
        throws TException {
    out.writeI32(-2);
    out.writeString("Exception in get_file_raw: file=" + file
            + "description=" + e.getMessage());
    Logger.me.e(e);
    Logger.me.i("get file ended wtih errors: " + e.getMessage());
}
}
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜