Java - Apache Thrift Transport File by nonblocking mode

有幸接觸Thrift這個RPC Framework,主要是能夠建置資料微量傳輸的架構,裡面提供了多種

語言的支援,也就是說您可以利用Java或C++等語言搭配thrift實作出您的資料傳輸,而您必須

先定義一抽象的Service,透過IDL語言來撰寫,再利用thrift.exe編譯器轉成相關語言的程式

檔。

這個自動生成的檔案算是一個抽象介面,並且已包裝了與thrift底層溝通的read、write程式。

以Java來說,您必須定義一個類別來實作該類別檔內的Iface,並且覆寫相關您在IDL內定義

的方法或類別。如此一來,Client才可以呼叫相關的Service method來達到兩者之間的資料

傳輸。一般只需要撰寫邏輯相關的程式即可,並且可以使用thrift所提供的Transport、

Protocol、Server mode等,在您撰寫Client、Server時決定要採用哪個建構方式。

PS. 在此不特別介紹IDL語言及thrift的理論框架等,相關資料網路上可以找到很多囉


在這邊要提出的例子是,如何利用thrift來實作出檔案的傳輸,由Server端傳輸給Client端,

並且在當下的傳輸可限定特定大小的資料區塊,來達到批次傳輸的目的。


一、定義IDL語言(.thrift檔)
  1. struct FileChunk {
  2. 1: binary data
  3. 2: i64 remaining
  4. }
  5.  
  6. service StreamFileService {
  7. FileChunk getBytes(1:string ClientName, 2: i64 offset, 3: i32 size);
  8. i64 getFileSize();
  9. }

service名稱為StreamFileService,裡面會建構一個類別FileChunk並且定義了兩個成員變數,

這兩個變數目的為紀錄Server端當下所進行到的檔案傳輸情況,data表示當下bytes陣列檔案

區塊,在Client端將write到FileOutputStream內,進行檔案的輸出串流;remaining表示此次

在Server端傳輸的區塊大小。

getBytes所取到的實體,即可以得到當下的data、remaining等成員變數的值

getFileSize為取得當下要傳輸到Client端的檔案大小

二、編譯.thrift檔

指令如下:

thrift-0.9.2.exe -gen java ./src/com/zh/exam/StreamFileService.thrift

此時會產生一個目錄



三、Server端 - StreamFileServer
  1. private void start() {
  2. try {
  3. TNonblockingServerTransport theServerSocket = new TNonblockingServerSocket(7911);
  4. @SuppressWarnings({ "rawtypes", "unchecked" })
  5. StreamFileService.Processor theProcessor =
  6. new StreamFileService.Processor(new StreamFileServiceImpl());
  7. TServer theServer = new TNonblockingServer(new TNonblockingServer.Args(theServerSocket).processor(theProcessor));
  8. System.out.println("Server starting on port 7911...");
  9. theServer.serve();
  10. } catch (TTransportException e) {
  11. e.printStackTrace();
  12. }
  13. }
  14. public static void main(String[] args) {
  15. StreamFileServer theFileServer = new StreamFileServer();
  16. theFileServer.start();
  17. }

Server端採用的非阻塞式IO傳輸模式,達到以單一執行程序可以一次處理多個請求的效果,

如同Java的NIO,也類似於實作一個多執行緒處理,但在這邊利用非阻塞式IO也可以達到同

樣的效果,但只需要一個執行程序即可。以thrift框架來實作的話,在程式的撰寫上比起NIO

來說簡潔許多,由於很多東西都被Service及thrift底層處理掉了,因此只需要將心力花在程

式邏輯上即可。

四、Server端 Service實作 - StreamFileServiceImpl
  1. public class StreamFileServiceImpl implements StreamFileService.Iface {
  2. File theFile = new File("example.doc");
  3. public FileChunk getBytes(String clientName, long offset, int size)throws TException {
  4. FileChunk chunk = new FileChunk();
  5. FileInputStream stream = null;
  6. try {
  7. long fileSize = getFileSize();
  8. int partSize = 1024 * size;
  9. long chunkSize = 0;
  10. byte fileContent[] = new byte[(int) fileSize];
  11. stream = new FileInputStream(theFile);
  12. stream.read(fileContent, 0, (int)(fileContent.length));
  13. chunkSize = (fileSize - offset <= partSize) ? fileSize - offset : partSize;
  14. System.out.println(clientName+"---offset=>"+(offset/1024)+" KB, trans. "+(chunkSize/1024)+" KB.");
  15. byte part[] = Arrays.copyOfRange(fileContent, (int)offset, (int)(offset + chunkSize));
  16. ByteBuffer byteBuf = ByteBuffer.wrap(part);
  17. chunk.data = byteBuf;
  18. chunk.remaining = chunkSize;
  19. Thread.sleep(100);
  20. } catch (FileNotFoundException e) {
  21. // TODO Auto-generated catch block
  22. e.printStackTrace();
  23. } catch (Exception e) {
  24. // TODO Auto-generated catch block
  25. e.printStackTrace();
  26. } finally{
  27. try {
  28. stream.close();
  29. } catch (IOException e) {
  30. // TODO Auto-generated catch block
  31. e.printStackTrace();
  32. }
  33. }
  34. return chunk;
  35. }
  36. @Override
  37. public long getFileSize() throws TException {
  38. // TODO Auto-generated method stub
  39. return theFile.length();
  40. }
  41. }

Service的實作為Client端呼叫Server端提供的服務的邏輯處理,在這邊主要的方法getBytes

,將傳入當下的offset及size(傳輸給client端的區塊大小)。offset記錄著已經傳到了檔案的哪個

位置,以byte陣列而言,如此一來才知道在方法內要取得哪一段的bytes資料作傳輸。

下面這行會計算目前要傳輸的區塊大小,假如已經快要傳送完了,此時不可以再傳送client

端設定的size,需以實際剩下的為主。

chunkSize = (fileSize - offset <= partSize) ? fileSize - offset : partSize;

緊接著,再進行bytes的copy

byte part[] = Arrays.copyOfRange(fileContent, (int)offset, (int)(offset + chunkSize));

最後,將取得的區塊bytes array轉換成ByteBuffer,寫入至FileChunk的實體內回傳給Client

ByteBuffer byteBuf = ByteBuffer.wrap(part);
chunk.data = byteBuf;
chunk.remaining = chunkSize;

五、Client端 - StreamFileClient
  1. public class StreamFileClient {
  2. private int fileChunkSize = 4;
  3. private void invoke() {
  4. TTransport theClientTransport = null;
  5. FileChannel theFileChannel = null;
  6. FileOutputStream outputStream = null;
  7. double FileSize = 0;
  8. String clientName = "BEN-PC-1";
  9. String filePath = "example1.doc";
  10. try {
  11. theClientTransport = new TFramedTransport(new TSocket("localhost", 7911));
  12. TProtocol theProtocol = new TBinaryProtocol(theClientTransport);
  13. StreamFileService.Client theClient = new StreamFileService.Client(theProtocol);
  14. theClientTransport.open();
  15. File f = new File(filePath);
  16. f.createNewFile();
  17. outputStream = new FileOutputStream(f);
  18. long currentPosition = 0;
  19. theFileChannel = outputStream.getChannel();
  20. FileSize = theClient.getFileSize()/1024.0;
  21. boolean again = true;
  22. do {
  23. FileChunk chunk = theClient.getBytes(clientName, currentPosition, fileChunkSize);
  24. currentPosition += chunk.remaining;
  25. double pgress = currentPosition/1024.0;
  26. if (chunk.remaining == 0)
  27. again = false;
  28. else{
  29. System.out.println("Total Sizes "+FileSize+" KB. " +
  30. "Current Transport Progress "+pgress+" KB.");
  31. theFileChannel.write(chunk.data);
  32. }
  33. } while (again);
  34. } catch (TTransportException e) {
  35. e.printStackTrace();
  36. } catch (TException e) {
  37. // TODO Auto-generated catch block
  38. e.printStackTrace();
  39. } catch (IOException e) {
  40. // TODO Auto-generated catch block
  41. e.printStackTrace();
  42. } finally{
  43. try {
  44. theFileChannel.close();
  45. outputStream.flush();
  46. outputStream.close();
  47. } catch (IOException e) {
  48. // TODO Auto-generated catch block
  49. e.printStackTrace();
  50. }
  51. theClientTransport.close();
  52. }
  53. }
  54. public static void main(String[] args) {
  55. StreamFileClient theClient = new StreamFileClient();
  56. theClient.invoke();
  57. }
  58. }

Client端由於搭配Server端的非阻塞式IO傳輸,因此需建構出TFramedTransport的實體為進

行傳輸的方式,在傳輸協定上是採用thrift的TBinaryProtocol。

需要注意的為do ... while迴圈內所做的事

FileChunk chunk = theClient.getBytes(clientName, currentPosition, fileChunkSize);

當下將取得chunk的實體,記錄著offset及chunk size,此時進行offset的更新

currentPosition += chunk.remaining;

如此一來,下次呼叫getBytes時,在Service那邊將從新的offset開始copy chunk size大小的

資料。

當if (chunk.remaining == 0)成立時,表示offset已經在檔案的末端了,此時所取得的remaining

會等於0,程式將結束,也表示檔案已經傳輸成功,並且當案將建立在client端所指定的目錄。

相對地,如果remaining有值,表示bytebuffer有資料要寫入至檔案輸出串流內

theFileChannel.write(chunk.data);


執行結果





參考資料:http://stackoverflow.com/questions/20960541/file-transport-between-server-client

這個範例概念是在找尋相關資料所爬到的,不過當初在跑參考資料的範例時無法work,

因此就修改了原範例囉!

留言