import com.google.common.util.concurrent.AbstractFuture;
public class ResponseFuture<JsonPRotocol>extends AbstractFuture<JsonProtocol>
{
private final Executorexecutor;
public ResponseFuture()
{
if (ThreadLocalUtil.get("isServer") ==null)
{
//TODO 這里是錯誤的,把這行代碼移到一個單例的全局共享中取,避免每次new。如果是服務端,那么所有的服務端都共享一個線程池
executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new RpcThreadFacotry("CallBack"));
}
else
{
//單線程執行器
executor = MoreExecutors.directExecutor();
}
}
/**
* 當響應回來的時候,結果被設置到future中,因此從future中可以獲得一個異步的響應結果
* @param responseProtocol
*/
public void onResponse(JsonProtocolresponseProtocol)
{
//向future中設置值
super.set(responseProtocol);
}
/**
* 當響應有結果時候可以直接runnable的方法
* @param runnable
*/
public void addCallBack(Runnablerunnable)
{
super.addListener(runnable,executor);
}
}
2、等待線程獲得響應結果(使用future.get阻塞等待異步線程的響應)
ResponseFuture<JsonProtocol>reponseFuture = client.futureInvoke(JsonProtocolReqeust);
JsonProtocol JsonProtocolResponse =null;
try
{
JsonProtocolResponse =reponseFuture.get(JsonProtocolReqeust.getRpcMetadata().getTimeOut(),
TimeUnit.MILLISECONDS);
}
catch (ExecutionException e){}
catch (TimeoutException e)
{
throw new RuntimeException("調用遠程服務響應超時",e);
}
新聞熱點
疑難解答