Java - Apache Thrift Transport File by nonblocking mode

有幸接觸Thrift這個RPC Framework,主要是能夠建置資料微量傳輸的架構,裡面提供了多種

語言的支援,也就是說您可以利用Java或C++等語言搭配thrift實作出您的資料傳輸,而您必須

先定義一抽象的Service,透過IDL語言來撰寫,再利用thrift.exe編譯器轉成相關語言的程式

檔。

這個自動生成的檔案算是一個抽象介面,並且已包裝了與thrift底層溝通的read、write程式。

以Java來說,您必須定義一個類別來實作該類別檔內的Iface,並且覆寫相關您在IDL內定義

的方法或類別。如此一來,Client才可以呼叫相關的Service method來達到兩者之間的資料

傳輸。一般只需要撰寫邏輯相關的程式即可,並且可以使用thrift所提供的Transport、

Protocol、Server mode等,在您撰寫Client、Server時決定要採用哪個建構方式。

PS. 在此不特別介紹IDL語言及thrift的理論框架等,相關資料網路上可以找到很多囉


在這邊要提出的例子是,如何利用thrift來實作出檔案的傳輸,由Server端傳輸給Client端,

並且在當下的傳輸可限定特定大小的資料區塊,來達到批次傳輸的目的。


一、定義IDL語言(.thrift檔)
struct FileChunk {
   1: binary data
   2: i64 remaining
}

service StreamFileService {    
   FileChunk getBytes(1:string ClientName, 2: i64 offset, 3: i32 size); 
   i64 getFileSize();
}

service名稱為StreamFileService,裡面會建構一個類別FileChunk並且定義了兩個成員變數,

這兩個變數目的為紀錄Server端當下所進行到的檔案傳輸情況,data表示當下bytes陣列檔案

區塊,在Client端將write到FileOutputStream內,進行檔案的輸出串流;remaining表示此次

在Server端傳輸的區塊大小。

getBytes所取到的實體,即可以得到當下的data、remaining等成員變數的值

getFileSize為取得當下要傳輸到Client端的檔案大小

二、編譯.thrift檔

指令如下:

thrift-0.9.2.exe -gen java ./src/com/zh/exam/StreamFileService.thrift

此時會產生一個目錄



三、Server端 - StreamFileServer
private void start() {
     try {
      TNonblockingServerTransport theServerSocket = new TNonblockingServerSocket(7911);
      @SuppressWarnings({ "rawtypes", "unchecked" })
      StreamFileService.Processor theProcessor = 
               new StreamFileService.Processor(new StreamFileServiceImpl());
      TServer theServer = new TNonblockingServer(new TNonblockingServer.Args(theServerSocket).processor(theProcessor));
      System.out.println("Server starting on port 7911...");
      theServer.serve();
     } catch (TTransportException e) {
         e.printStackTrace();
     }
 }
 
 public static void main(String[] args) {
     StreamFileServer theFileServer = new StreamFileServer();
     theFileServer.start();
 }

Server端採用的非阻塞式IO傳輸模式,達到以單一執行程序可以一次處理多個請求的效果,

如同Java的NIO,也類似於實作一個多執行緒處理,但在這邊利用非阻塞式IO也可以達到同

樣的效果,但只需要一個執行程序即可。以thrift框架來實作的話,在程式的撰寫上比起NIO

來說簡潔許多,由於很多東西都被Service及thrift底層處理掉了,因此只需要將心力花在程

式邏輯上即可。

四、Server端 Service實作 - StreamFileServiceImpl
public class StreamFileServiceImpl implements StreamFileService.Iface {
   File theFile = new File("example.doc");
 
   public FileChunk getBytes(String clientName, long offset, int size)throws TException {
       FileChunk chunk = new FileChunk();
       FileInputStream stream = null;
       try {
         long fileSize = getFileSize();
         int  partSize = 1024 * size;
         long chunkSize = 0;
         byte fileContent[] = new byte[(int) fileSize];

         stream = new FileInputStream(theFile);         
         stream.read(fileContent, 0, (int)(fileContent.length));
         chunkSize = (fileSize - offset <= partSize) ? fileSize - offset : partSize;
         
         System.out.println(clientName+"---offset=>"+(offset/1024)+" KB, trans. "+(chunkSize/1024)+" KB.");
         
         byte part[] = Arrays.copyOfRange(fileContent, (int)offset, (int)(offset + chunkSize));
         ByteBuffer byteBuf = ByteBuffer.wrap(part);  
         
         chunk.data   = byteBuf;
         chunk.remaining = chunkSize;
         Thread.sleep(100);
     } catch (FileNotFoundException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
     } catch (Exception e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
     } finally{
         try {
            stream.close();
         } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
         }
     }
     return chunk;
 }

 @Override
 public long getFileSize() throws TException {
     // TODO Auto-generated method stub
     return theFile.length();
 }
}

Service的實作為Client端呼叫Server端提供的服務的邏輯處理,在這邊主要的方法getBytes

,將傳入當下的offset及size(傳輸給client端的區塊大小)。offset記錄著已經傳到了檔案的哪個

位置,以byte陣列而言,如此一來才知道在方法內要取得哪一段的bytes資料作傳輸。

下面這行會計算目前要傳輸的區塊大小,假如已經快要傳送完了,此時不可以再傳送client

端設定的size,需以實際剩下的為主。

chunkSize = (fileSize - offset <= partSize) ? fileSize - offset : partSize;

緊接著,再進行bytes的copy

byte part[] = Arrays.copyOfRange(fileContent, (int)offset, (int)(offset + chunkSize));

最後,將取得的區塊bytes array轉換成ByteBuffer,寫入至FileChunk的實體內回傳給Client

ByteBuffer byteBuf = ByteBuffer.wrap(part);
chunk.data = byteBuf;
chunk.remaining = chunkSize;

五、Client端 - StreamFileClient
public class StreamFileClient {
   private int fileChunkSize = 4;
   private void invoke() {
   TTransport theClientTransport = null;
   FileChannel theFileChannel = null;
   FileOutputStream outputStream = null;
   double FileSize = 0;
   String clientName = "BEN-PC-1";
   String filePath = "example1.doc";
     try { 
         theClientTransport = new TFramedTransport(new TSocket("localhost", 7911));
         TProtocol theProtocol = new TBinaryProtocol(theClientTransport);
         StreamFileService.Client theClient = new StreamFileService.Client(theProtocol);
         theClientTransport.open();
 
         File f = new File(filePath);
         f.createNewFile();
         outputStream = new FileOutputStream(f);
         long currentPosition = 0;
 
         theFileChannel = outputStream.getChannel();
         FileSize = theClient.getFileSize()/1024.0;
         boolean again = true;
         do {
             FileChunk chunk = theClient.getBytes(clientName, currentPosition, fileChunkSize);
             currentPosition += chunk.remaining;
             double pgress = currentPosition/1024.0;
             if (chunk.remaining == 0)
                 again = false;
             else{
              System.out.println("Total Sizes "+FileSize+" KB. " +
                "Current Transport Progress "+pgress+" KB.");
              theFileChannel.write(chunk.data);
             } 
         } while (again);
         
     } catch (TTransportException e) {
         e.printStackTrace();
     } catch (TException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
     } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
     } finally{
      try {
         theFileChannel.close();
         outputStream.flush();
         outputStream.close();
      } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }    
      theClientTransport.close();
    }
 }
 
 public static void main(String[] args) {
     StreamFileClient theClient = new StreamFileClient();
     theClient.invoke();
 }
} 

Client端由於搭配Server端的非阻塞式IO傳輸,因此需建構出TFramedTransport的實體為進

行傳輸的方式,在傳輸協定上是採用thrift的TBinaryProtocol。

需要注意的為do ... while迴圈內所做的事

FileChunk chunk = theClient.getBytes(clientName, currentPosition, fileChunkSize);

當下將取得chunk的實體,記錄著offset及chunk size,此時進行offset的更新

currentPosition += chunk.remaining;

如此一來,下次呼叫getBytes時,在Service那邊將從新的offset開始copy chunk size大小的

資料。

當if (chunk.remaining == 0)成立時,表示offset已經在檔案的末端了,此時所取得的remaining

會等於0,程式將結束,也表示檔案已經傳輸成功,並且當案將建立在client端所指定的目錄。

相對地,如果remaining有值,表示bytebuffer有資料要寫入至檔案輸出串流內

theFileChannel.write(chunk.data);


執行結果





參考資料:http://stackoverflow.com/questions/20960541/file-transport-between-server-client

這個範例概念是在找尋相關資料所爬到的,不過當初在跑參考資料的範例時無法work,

因此就修改了原範例囉!

留言