有幸接觸Thrift這個RPC Framework,主要是能夠建置資料微量傳輸的架構,裡面提供了多種
語言的支援,也就是說您可以利用Java或C++等語言搭配thrift實作出您的資料傳輸,而您必須
先定義一抽象的Service,透過IDL語言來撰寫,再利用thrift.exe編譯器轉成相關語言的程式
檔。
這個自動生成的檔案算是一個抽象介面,並且已包裝了與thrift底層溝通的read、write程式。
以Java來說,您必須定義一個類別來實作該類別檔內的Iface,並且覆寫相關您在IDL內定義
的方法或類別。如此一來,Client才可以呼叫相關的Service method來達到兩者之間的資料
傳輸。一般只需要撰寫邏輯相關的程式即可,並且可以使用thrift所提供的Transport、
Protocol、Server mode等,在您撰寫Client、Server時決定要採用哪個建構方式。
PS. 在此不特別介紹IDL語言及thrift的理論框架等,相關資料網路上可以找到很多囉
在這邊要提出的例子是,如何利用thrift來實作出檔案的傳輸,由Server端傳輸給Client端,
並且在當下的傳輸可限定特定大小的資料區塊,來達到批次傳輸的目的。
一、定義IDL語言(.thrift檔)
service名稱為StreamFileService,裡面會建構一個類別FileChunk並且定義了兩個成員變數,
這兩個變數目的為紀錄Server端當下所進行到的檔案傳輸情況,data表示當下bytes陣列檔案
區塊,在Client端將write到FileOutputStream內,進行檔案的輸出串流;remaining表示此次
在Server端傳輸的區塊大小。
getBytes所取到的實體,即可以得到當下的data、remaining等成員變數的值
getFileSize為取得當下要傳輸到Client端的檔案大小
二、編譯.thrift檔
指令如下:
thrift-0.9.2.exe -gen java ./src/com/zh/exam/StreamFileService.thrift
此時會產生一個目錄
Server端採用的非阻塞式IO傳輸模式,達到以單一執行程序可以一次處理多個請求的效果,
如同Java的NIO,也類似於實作一個多執行緒處理,但在這邊利用非阻塞式IO也可以達到同
樣的效果,但只需要一個執行程序即可。以thrift框架來實作的話,在程式的撰寫上比起NIO
來說簡潔許多,由於很多東西都被Service及thrift底層處理掉了,因此只需要將心力花在程
式邏輯上即可。
四、Server端 Service實作 - StreamFileServiceImpl
Service的實作為Client端呼叫Server端提供的服務的邏輯處理,在這邊主要的方法getBytes
,將傳入當下的offset及size(傳輸給client端的區塊大小)。offset記錄著已經傳到了檔案的哪個
位置,以byte陣列而言,如此一來才知道在方法內要取得哪一段的bytes資料作傳輸。
下面這行會計算目前要傳輸的區塊大小,假如已經快要傳送完了,此時不可以再傳送client
端設定的size,需以實際剩下的為主。
chunkSize = (fileSize - offset <= partSize) ? fileSize - offset : partSize;
緊接著,再進行bytes的copy
byte part[] = Arrays.copyOfRange(fileContent, (int)offset, (int)(offset + chunkSize));
最後,將取得的區塊bytes array轉換成ByteBuffer,寫入至FileChunk的實體內回傳給Client
ByteBuffer byteBuf = ByteBuffer.wrap(part);
chunk.data = byteBuf;
chunk.remaining = chunkSize;
五、Client端 - StreamFileClient
Client端由於搭配Server端的非阻塞式IO傳輸,因此需建構出TFramedTransport的實體為進
行傳輸的方式,在傳輸協定上是採用thrift的TBinaryProtocol。
需要注意的為do ... while迴圈內所做的事
FileChunk chunk = theClient.getBytes(clientName, currentPosition, fileChunkSize);
當下將取得chunk的實體,記錄著offset及chunk size,此時進行offset的更新
currentPosition += chunk.remaining;
如此一來,下次呼叫getBytes時,在Service那邊將從新的offset開始copy chunk size大小的
資料。
當if (chunk.remaining == 0)成立時,表示offset已經在檔案的末端了,此時所取得的remaining
會等於0,程式將結束,也表示檔案已經傳輸成功,並且當案將建立在client端所指定的目錄。
相對地,如果remaining有值,表示bytebuffer有資料要寫入至檔案輸出串流內
theFileChannel.write(chunk.data);
執行結果
參考資料:http://stackoverflow.com/questions/20960541/file-transport-between-server-client
這個範例概念是在找尋相關資料所爬到的,不過當初在跑參考資料的範例時無法work,
因此就修改了原範例囉!
語言的支援,也就是說您可以利用Java或C++等語言搭配thrift實作出您的資料傳輸,而您必須
先定義一抽象的Service,透過IDL語言來撰寫,再利用thrift.exe編譯器轉成相關語言的程式
檔。
這個自動生成的檔案算是一個抽象介面,並且已包裝了與thrift底層溝通的read、write程式。
以Java來說,您必須定義一個類別來實作該類別檔內的Iface,並且覆寫相關您在IDL內定義
的方法或類別。如此一來,Client才可以呼叫相關的Service method來達到兩者之間的資料
傳輸。一般只需要撰寫邏輯相關的程式即可,並且可以使用thrift所提供的Transport、
Protocol、Server mode等,在您撰寫Client、Server時決定要採用哪個建構方式。
PS. 在此不特別介紹IDL語言及thrift的理論框架等,相關資料網路上可以找到很多囉
在這邊要提出的例子是,如何利用thrift來實作出檔案的傳輸,由Server端傳輸給Client端,
並且在當下的傳輸可限定特定大小的資料區塊,來達到批次傳輸的目的。
一、定義IDL語言(.thrift檔)
struct FileChunk { 1: binary data 2: i64 remaining } service StreamFileService { FileChunk getBytes(1:string ClientName, 2: i64 offset, 3: i32 size); i64 getFileSize(); }
service名稱為StreamFileService,裡面會建構一個類別FileChunk並且定義了兩個成員變數,
這兩個變數目的為紀錄Server端當下所進行到的檔案傳輸情況,data表示當下bytes陣列檔案
區塊,在Client端將write到FileOutputStream內,進行檔案的輸出串流;remaining表示此次
在Server端傳輸的區塊大小。
getBytes所取到的實體,即可以得到當下的data、remaining等成員變數的值
getFileSize為取得當下要傳輸到Client端的檔案大小
二、編譯.thrift檔
指令如下:
thrift-0.9.2.exe -gen java ./src/com/zh/exam/StreamFileService.thrift
此時會產生一個目錄
三、Server端 - StreamFileServer
private void start() {
try {
TNonblockingServerTransport theServerSocket = new TNonblockingServerSocket(7911);
@SuppressWarnings({ "rawtypes", "unchecked" })
StreamFileService.Processor theProcessor =
new StreamFileService.Processor(new StreamFileServiceImpl());
TServer theServer = new TNonblockingServer(new TNonblockingServer.Args(theServerSocket).processor(theProcessor));
System.out.println("Server starting on port 7911...");
theServer.serve();
} catch (TTransportException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
StreamFileServer theFileServer = new StreamFileServer();
theFileServer.start();
}
Server端採用的非阻塞式IO傳輸模式,達到以單一執行程序可以一次處理多個請求的效果,
如同Java的NIO,也類似於實作一個多執行緒處理,但在這邊利用非阻塞式IO也可以達到同
樣的效果,但只需要一個執行程序即可。以thrift框架來實作的話,在程式的撰寫上比起NIO
來說簡潔許多,由於很多東西都被Service及thrift底層處理掉了,因此只需要將心力花在程
式邏輯上即可。
四、Server端 Service實作 - StreamFileServiceImpl
public class StreamFileServiceImpl implements StreamFileService.Iface {
File theFile = new File("example.doc");
public FileChunk getBytes(String clientName, long offset, int size)throws TException {
FileChunk chunk = new FileChunk();
FileInputStream stream = null;
try {
long fileSize = getFileSize();
int partSize = 1024 * size;
long chunkSize = 0;
byte fileContent[] = new byte[(int) fileSize];
stream = new FileInputStream(theFile);
stream.read(fileContent, 0, (int)(fileContent.length));
chunkSize = (fileSize - offset <= partSize) ? fileSize - offset : partSize;
System.out.println(clientName+"---offset=>"+(offset/1024)+" KB, trans. "+(chunkSize/1024)+" KB.");
byte part[] = Arrays.copyOfRange(fileContent, (int)offset, (int)(offset + chunkSize));
ByteBuffer byteBuf = ByteBuffer.wrap(part);
chunk.data = byteBuf;
chunk.remaining = chunkSize;
Thread.sleep(100);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
try {
stream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return chunk;
}
@Override
public long getFileSize() throws TException {
// TODO Auto-generated method stub
return theFile.length();
}
}
Service的實作為Client端呼叫Server端提供的服務的邏輯處理,在這邊主要的方法getBytes
,將傳入當下的offset及size(傳輸給client端的區塊大小)。offset記錄著已經傳到了檔案的哪個
位置,以byte陣列而言,如此一來才知道在方法內要取得哪一段的bytes資料作傳輸。
下面這行會計算目前要傳輸的區塊大小,假如已經快要傳送完了,此時不可以再傳送client
端設定的size,需以實際剩下的為主。
chunkSize = (fileSize - offset <= partSize) ? fileSize - offset : partSize;
緊接著,再進行bytes的copy
byte part[] = Arrays.copyOfRange(fileContent, (int)offset, (int)(offset + chunkSize));
最後,將取得的區塊bytes array轉換成ByteBuffer,寫入至FileChunk的實體內回傳給Client
ByteBuffer byteBuf = ByteBuffer.wrap(part);
chunk.data = byteBuf;
chunk.remaining = chunkSize;
五、Client端 - StreamFileClient
public class StreamFileClient {
private int fileChunkSize = 4;
private void invoke() {
TTransport theClientTransport = null;
FileChannel theFileChannel = null;
FileOutputStream outputStream = null;
double FileSize = 0;
String clientName = "BEN-PC-1";
String filePath = "example1.doc";
try {
theClientTransport = new TFramedTransport(new TSocket("localhost", 7911));
TProtocol theProtocol = new TBinaryProtocol(theClientTransport);
StreamFileService.Client theClient = new StreamFileService.Client(theProtocol);
theClientTransport.open();
File f = new File(filePath);
f.createNewFile();
outputStream = new FileOutputStream(f);
long currentPosition = 0;
theFileChannel = outputStream.getChannel();
FileSize = theClient.getFileSize()/1024.0;
boolean again = true;
do {
FileChunk chunk = theClient.getBytes(clientName, currentPosition, fileChunkSize);
currentPosition += chunk.remaining;
double pgress = currentPosition/1024.0;
if (chunk.remaining == 0)
again = false;
else{
System.out.println("Total Sizes "+FileSize+" KB. " +
"Current Transport Progress "+pgress+" KB.");
theFileChannel.write(chunk.data);
}
} while (again);
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
try {
theFileChannel.close();
outputStream.flush();
outputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
theClientTransport.close();
}
}
public static void main(String[] args) {
StreamFileClient theClient = new StreamFileClient();
theClient.invoke();
}
}
Client端由於搭配Server端的非阻塞式IO傳輸,因此需建構出TFramedTransport的實體為進
行傳輸的方式,在傳輸協定上是採用thrift的TBinaryProtocol。
需要注意的為do ... while迴圈內所做的事
FileChunk chunk = theClient.getBytes(clientName, currentPosition, fileChunkSize);
當下將取得chunk的實體,記錄著offset及chunk size,此時進行offset的更新
currentPosition += chunk.remaining;
如此一來,下次呼叫getBytes時,在Service那邊將從新的offset開始copy chunk size大小的
資料。
當if (chunk.remaining == 0)成立時,表示offset已經在檔案的末端了,此時所取得的remaining
會等於0,程式將結束,也表示檔案已經傳輸成功,並且當案將建立在client端所指定的目錄。
相對地,如果remaining有值,表示bytebuffer有資料要寫入至檔案輸出串流內
theFileChannel.write(chunk.data);
執行結果
這個範例概念是在找尋相關資料所爬到的,不過當初在跑參考資料的範例時無法work,
因此就修改了原範例囉!
留言
張貼留言