2007年12月18日 星期二

GridGain 程式開發(8) - Gridify With Session

Grid Task Session 運作方式與目的

此範例要介紹在 Grid Task 與 Grdi Job 之間,可以用來作為溝通媒介的「Grid Task Session」。

運作的機制如下:
  1. Grid Task Session 顧名思義,可以知道是屬於 Grid Task 而非 Grid Job

  2. 在 Grid Task Session 中可以加入 attribute 與 checkpoint 資訊,這些則是屬於 Grid Job 的資訊;而 Grid Job 加入 attribute 的用意通常作為與 Grid Task 溝通之用,而 checkpoint 則是記錄執行到何處,以便 job 執行失敗時進行 failover

  3. 每個 Grid Job 都可以檢視其他 job 所擁有的 attribute 與 checkpoint 相關資訊,因此可作為 Grid Job 之間溝通的橋樑

  4. 當然,Grid Task Session 也是可以被 Grid Task,以及所有該 task 拆出來的 Grid Job 所檢視


AspectJ AOP 設定

這個部分跟前面幾篇都是一樣的,因此就不贅述了!


範例程式

了解 Grid Task Session 的運作方式與目的之後,以下就使用程式進行說明:

GridifyHelloWorldSessionExample.java
import org.gridgain.grid.*;
import org.gridgain.grid.gridify.*;

public class GridifyHelloWorldSessionExample {

/**
* @param args
*/

public static void main(String[] args) throws GridException {
GridFactory.start();

try {
sayIt("Hello(1) this(2) world(3)");
System.out.println("\n=========== local 執行完畢 ===========\n");
} finally {
GridFactory.stop(true);
}
} //end main

/**
* 只有第一次呼叫此 method 才會被 GridGain 當作 GridTask 分成 GridJobs
* 之後僅會執行 function 內的動作(印出結果)
* */

@Gridify(taskClass=GridifyHelloWorldSessionTask.class, timeout=3000)
public static void sayIt(String arg) {
System.out.println("\n=========== remote 開始執行 ===========");
System.out.println(">>>> 印出 " + arg);
System.out.println("=========== remote 執行完畢 ===========\n");
} //end sayIt
}
GridifyHelloWorldSessionTask.java
import org.gridgain.grid.*;
import org.gridgain.grid.resources.*;
import org.gridgain.grid.gridify.*;
import java.util.*;
import java.io.*;

//處理 Grid Task 拆開成多個 Grid Jobs 的工作
public class GridifyHelloWorldSessionTask extends GridTaskSplitAdapter<GridifyArgument> {
//注入(inject) GridTaskSessionResource
//不論是 task 或是分出去的 jobs 都可以看見此 session 與裡面的內容(attribute、checkpoint)
@GridTaskSessionResource
private GridTaskSession ses = null;

@Override
protected Collection<? extends GridJob> split(int gridSize, GridifyArgument arg) throws GridException {
//取得傳入的字串,並以空白為間隔,拆為陣列
String[] words = ((String)arg.getMethodParameters()[0]).split(" ");

//產生 Grid Job collection
List<GridJobAdapter<String>> jobs = new ArrayList<GridJobAdapter<String>>(words.length);
for(String word : words) {
jobs.add(new GridJobAdapter<String>(word) {

//注入(inject) GridJobIdResource
//雖然乍看之下 jobID 被設定為 null,但經過注入 @GridJobResource 後
//就會被實際的 GridJobId 給取代(範例:92a5c837-5d64-413c-b254-1a79038f907d)
@GridJobIdResource
private UUID jobID = null;

/**
* 執行工作:
* <ol>
* <li>遠端執行所指定的工作(印出部分字串)</li>
* <li>將所印出的部分字串當作 attribute 加入 task session</li>
* <li>等待其他 Grid Job 也將其所取得的部分字串當作 attribute 加入 task session</li>
* <li>當所有 Grid Job 的 attribute 都設定完後,印出蒐集到的全部字串</li>
* </ol>
*/

@Override
public Serializable execute() throws GridException {
//印出一開始由 task 分配到的部分字串(GridGain 不會將其視為 Grid Task 處理)
GridifyHelloWorldSessionExample.sayIt(this.getArgument());

//將 jobID attribute 加入 GridTaskSession
//因此在 task 與所有 jobs 中都可以看見此 attribute 的內容
ses.setAttribute(jobID, this.getArgument());
try {
//等待所有屬於 task 中的 job 將 attribute 設定到 session 上
for(GridJobSibling sibling : ses.getJobSiblings()) {
//檢查 attribute 是否在 session 中 (Null 代表不存在)
if(ses.waitForAttribute(sibling.getJobId()) == null)
throw new GridException("Failed to get session attribute from job " + sibling.getJobId());
} //end loop
} catch(InterruptedException e) {
throw new GridException("Got interrupted while waiting for session attribute.", e);
}

StringBuilder msg = new StringBuilder();
for(Serializable arg : ses.getAttributes().values())
msg.append(arg).append(' ');

//印出 session 中所有字串的內容(GridGain 不會將其視為 Grid Task 處理)
GridifyHelloWorldSessionExample.sayIt(msg.toString());

return null;
}
});
} //end loop

return jobs;
} //end split

@Override
public Object reduce(List<GridJobResult> arg0) throws GridException {
return null;
} //end reduce
}


重要使用須知

上述的程式碼看起來沒什麼大問題,不過有一個相當重要的地方必須注意! 就是在 GridTaskSession.waitForAttribute(Serializable) 這個 method 上。

仔細注意此 method 所在的位置,位於「Grid Job Sibling 集合的 for loop 中」,加上參數所代入的是 GridJobSibling.getJobId();因此上面那段程式碼的意思為「確定每個 GridJobSibling 都將 JobID 這個 attribute 設定到 session 中」。

這樣說來似乎也覺得沒有什麼,不過重要的是:
若 waitForAttribute(Serializeable) method 中放的不是 GridJobId,而是其他的值呢? 此時該如何運作?
這樣有辦法進行嗎? 目前根據我的理解應該是不行,以下舉個範例:

假設 GridTaskA 拆成 GridJobA + GridJobB + GridJobC,假設在 remote gird node 在處理 GridJobA 的情況下,根據上面那一段 for loop 程式,會執行以下工作:
  1. 取得 GridJobSibling,分別是 GridJobB 與 GridJobC

  2. 指定 session 等待 GridJobB 將 JobID attribute 的加入,此時會等待其他 remote grid node 將 GridJobB 將其 JobID 設定進 session 中

  3. 確定 GridJobB 設定好 attribute

  4. 指定 session 等待 GridJobC 將 JobID attribute 的加入,此時會等待其他 remote grid node 將 GridJobC 將其 JobID 設定進 session 中

  5. Sibling 全部執行結束!

這樣看起來似乎還是沒什麼問題,但是重點來了! 在第二個與第四個流程,代入 GridTaskSession.waitForAttribute 的參數是??

沒錯! 就是 GridJobSibling.getJobId(),但這樣又有啥問題呢?

問題可大了! 這代表「使用者必須想辦法取得 Sibling 的 GridJobID」!!

因此問題慢慢釐清了,由於 GridJobSibling 提供了 getId() method,因此可以透過此 method 取得 GridJobID,所以沒有問題!

但是! 若是想要改用自行設定的值而非 JobID 呢?
恩,很抱歉! GridJobSibling 並沒有提供其他方法可以取得使用者所自訂的值!

因此,GridGain 在官方網站的文件上提到「可以透過 waitForAttribute 這種方式,讓多個 GridJob 在某種程度上達成有順序的執行」,其實還是有其限制在的;也就是說,若要判斷 session 中的 attribute 是由哪一個 GridJob 所設定的,那就必須使用 GridJobId 作為 attribute 的 key,因此在使用上還是需要多注意囉!

最後,看完上面的囉哩八嗦,應該可以了解 Grid Task Session 的使用方式以及注意事項了!

沒有留言:

張貼留言