此範例要介紹在 Grid Task 與 Grdi Job 之間,可以用來作為溝通媒介的「Grid Task Session」。
運作的機制如下:
- Grid Task Session 顧名思義,可以知道是屬於 Grid Task 而非 Grid Job
- 在 Grid Task Session 中可以加入 attribute 與 checkpoint 資訊,這些則是屬於 Grid Job 的資訊;而 Grid Job 加入 attribute 的用意通常作為與 Grid Task 溝通之用,而 checkpoint 則是記錄執行到何處,以便 job 執行失敗時進行 failover
- 每個 Grid Job 都可以檢視其他 job 所擁有的 attribute 與 checkpoint 相關資訊,因此可作為 Grid Job 之間溝通的橋樑
- 當然,Grid Task Session 也是可以被 Grid Task,以及所有該 task 拆出來的 Grid Job 所檢視
AspectJ AOP 設定
這個部分跟前面幾篇都是一樣的,因此就不贅述了!
範例程式
了解 Grid Task Session 的運作方式與目的之後,以下就使用程式進行說明:
GridifyHelloWorldSessionExample.java
GridifyHelloWorldSessionTask.javaimport org.gridgain.grid.*;
import org.gridgain.grid.gridify.*;
public class GridifyHelloWorldSessionExample {
/**
* @param args
*/
public static void main(String[] args) throws GridException {
GridFactory.start();
try {
sayIt("Hello(1) this(2) world(3)");
System.out.println("\n=========== local 執行完畢 ===========\n");
} finally {
GridFactory.stop(true);
}
} //end main
/**
* 只有第一次呼叫此 method 才會被 GridGain 當作 GridTask 分成 GridJobs
* 之後僅會執行 function 內的動作(印出結果)
* */
@Gridify(taskClass=GridifyHelloWorldSessionTask.class, timeout=3000)
public static void sayIt(String arg) {
System.out.println("\n=========== remote 開始執行 ===========");
System.out.println(">>>> 印出 " + arg);
System.out.println("=========== remote 執行完畢 ===========\n");
} //end sayIt
}
import org.gridgain.grid.*;
import org.gridgain.grid.resources.*;
import org.gridgain.grid.gridify.*;
import java.util.*;
import java.io.*;
//處理 Grid Task 拆開成多個 Grid Jobs 的工作
public class GridifyHelloWorldSessionTask extends GridTaskSplitAdapter<GridifyArgument> {
//注入(inject) GridTaskSessionResource
//不論是 task 或是分出去的 jobs 都可以看見此 session 與裡面的內容(attribute、checkpoint)
@GridTaskSessionResource
private GridTaskSession ses = null;
@Override
protected Collection<? extends GridJob> split(int gridSize, GridifyArgument arg) throws GridException {
//取得傳入的字串,並以空白為間隔,拆為陣列
String[] words = ((String)arg.getMethodParameters()[0]).split(" ");
//產生 Grid Job collection
List<GridJobAdapter<String>> jobs = new ArrayList<GridJobAdapter<String>>(words.length);
for(String word : words) {
jobs.add(new GridJobAdapter<String>(word) {
//注入(inject) GridJobIdResource
//雖然乍看之下 jobID 被設定為 null,但經過注入 @GridJobResource 後
//就會被實際的 GridJobId 給取代(範例:92a5c837-5d64-413c-b254-1a79038f907d)
@GridJobIdResource
private UUID jobID = null;
/**
* 執行工作:
* <ol>
* <li>遠端執行所指定的工作(印出部分字串)</li>
* <li>將所印出的部分字串當作 attribute 加入 task session</li>
* <li>等待其他 Grid Job 也將其所取得的部分字串當作 attribute 加入 task session</li>
* <li>當所有 Grid Job 的 attribute 都設定完後,印出蒐集到的全部字串</li>
* </ol>
*/
@Override
public Serializable execute() throws GridException {
//印出一開始由 task 分配到的部分字串(GridGain 不會將其視為 Grid Task 處理)
GridifyHelloWorldSessionExample.sayIt(this.getArgument());
//將 jobID attribute 加入 GridTaskSession
//因此在 task 與所有 jobs 中都可以看見此 attribute 的內容
ses.setAttribute(jobID, this.getArgument());
try {
//等待所有屬於 task 中的 job 將 attribute 設定到 session 上
for(GridJobSibling sibling : ses.getJobSiblings()) {
//檢查 attribute 是否在 session 中 (Null 代表不存在)
if(ses.waitForAttribute(sibling.getJobId()) == null)
throw new GridException("Failed to get session attribute from job " + sibling.getJobId());
} //end loop
} catch(InterruptedException e) {
throw new GridException("Got interrupted while waiting for session attribute.", e);
}
StringBuilder msg = new StringBuilder();
for(Serializable arg : ses.getAttributes().values())
msg.append(arg).append(' ');
//印出 session 中所有字串的內容(GridGain 不會將其視為 Grid Task 處理)
GridifyHelloWorldSessionExample.sayIt(msg.toString());
return null;
}
});
} //end loop
return jobs;
} //end split
@Override
public Object reduce(List<GridJobResult> arg0) throws GridException {
return null;
} //end reduce
}
重要使用須知
上述的程式碼看起來沒什麼大問題,不過有一個相當重要的地方必須注意! 就是在 GridTaskSession.waitForAttribute(Serializable) 這個 method 上。
仔細注意此 method 所在的位置,位於「Grid Job Sibling 集合的 for loop 中」,加上參數所代入的是 GridJobSibling.getJobId();因此上面那段程式碼的意思為「確定每個 GridJobSibling 都將 JobID 這個 attribute 設定到 session 中」。
這樣說來似乎也覺得沒有什麼,不過重要的是:
若 waitForAttribute(Serializeable) method 中放的不是 GridJobId,而是其他的值呢? 此時該如何運作?這樣有辦法進行嗎? 目前根據我的理解應該是不行,以下舉個範例:
假設 GridTaskA 拆成 GridJobA + GridJobB + GridJobC,假設在 remote gird node 在處理 GridJobA 的情況下,根據上面那一段 for loop 程式,會執行以下工作:
- 取得 GridJobSibling,分別是 GridJobB 與 GridJobC
- 指定 session 等待 GridJobB 將 JobID attribute 的加入,此時會等待其他 remote grid node 將 GridJobB 將其 JobID 設定進 session 中
- 確定 GridJobB 設定好 attribute
- 指定 session 等待 GridJobC 將 JobID attribute 的加入,此時會等待其他 remote grid node 將 GridJobC 將其 JobID 設定進 session 中
- Sibling 全部執行結束!
這樣看起來似乎還是沒什麼問題,但是重點來了! 在第二個與第四個流程,代入 GridTaskSession.waitForAttribute 的參數是??
沒錯! 就是 GridJobSibling.getJobId(),但這樣又有啥問題呢?
問題可大了! 這代表「使用者必須想辦法取得 Sibling 的 GridJobID」!!
因此問題慢慢釐清了,由於 GridJobSibling 提供了 getId() method,因此可以透過此 method 取得 GridJobID,所以沒有問題!
但是! 若是想要改用自行設定的值而非 JobID 呢?
恩,很抱歉! GridJobSibling 並沒有提供其他方法可以取得使用者所自訂的值!
因此,GridGain 在官方網站的文件上提到「可以透過 waitForAttribute 這種方式,讓多個 GridJob 在某種程度上達成有順序的執行」,其實還是有其限制在的;也就是說,若要判斷 session 中的 attribute 是由哪一個 GridJob 所設定的,那就必須使用 GridJobId 作為 attribute 的 key,因此在使用上還是需要多注意囉!
最後,看完上面的囉哩八嗦,應該可以了解 Grid Task Session 的使用方式以及注意事項了!
沒有留言:
張貼留言