|
@@ -425,62 +425,66 @@ public class TaskWorkNode {
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
public Map checkCon(TaskNode taskNode, TTask task, Map<String, Object> dataMap) {
|
|
|
-// Map map = getCheckCon(taskNode, task, dataMap);
|
|
|
-// boolean b = (boolean) map.get("result");
|
|
|
-// if (!b) {
|
|
|
-// Thread.sleep(10000);
|
|
|
-// return checkCon(taskNode, task, dataMap);
|
|
|
-// } else {
|
|
|
-// return map;
|
|
|
-// }
|
|
|
- logger.info("异步消息获取执行状态:");
|
|
|
- //单线程化线程池
|
|
|
- //初始化执行次数(默认10次) 10次后按超时处理
|
|
|
- ExecutorService pool = Executors.newSingleThreadExecutor();
|
|
|
- List<CompletableFuture<Map>> futures = new ArrayList<CompletableFuture<Map>>();
|
|
|
-
|
|
|
- List<Map> retList = new ArrayList<Map>();
|
|
|
- int[] nums = IntStream.range(0, 9).toArray();
|
|
|
-
|
|
|
- CompletableFuture<Map> future = CompletableFuture.supplyAsync(() -> {
|
|
|
- Map<String, Object> threadMap = getCheckCon(taskNode, task, dataMap);
|
|
|
- boolean bool= (Boolean) threadMap.get("result");
|
|
|
- return bool? threadMap: null;
|
|
|
- }, pool).handle((result, ex) -> {
|
|
|
- if (null != ex) {
|
|
|
- return null;
|
|
|
- } else {
|
|
|
- if(null!=result){
|
|
|
- retList.add((Map)result);
|
|
|
- }
|
|
|
- return result;
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
- if (null == future.join()){
|
|
|
- //开启异步串行调用
|
|
|
- for (int i : nums) {
|
|
|
- logger.info("节点:" + taskNode.getId() + "进行第" + i + "次异步轮询条件查询");
|
|
|
- future = future.thenApply(map -> {
|
|
|
- Map<String, Object> threadMap = null;
|
|
|
- try {
|
|
|
- TimeUnit.SECONDS.sleep(5 * i);
|
|
|
- threadMap = getCheckCon(taskNode, task, dataMap);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- ie.printStackTrace();
|
|
|
- }
|
|
|
- boolean bool= (Boolean) threadMap.get("result");
|
|
|
- return bool? threadMap: null;
|
|
|
- });
|
|
|
- if (null != future.join()) {
|
|
|
- boolean bool = (Boolean) future.join().get("result");
|
|
|
- if (bool) {
|
|
|
- retList.add((Map) future.join());
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
+ Map map = getCheckCon(taskNode, task, dataMap);
|
|
|
+ boolean b = (boolean) map.get("result");
|
|
|
+ if (!b) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(10000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
}
|
|
|
+ return checkCon(taskNode, task, dataMap);
|
|
|
+ } else {
|
|
|
+ return map;
|
|
|
}
|
|
|
+// logger.info("异步消息获取执行状态:");
|
|
|
+// //单线程化线程池
|
|
|
+// //初始化执行次数(默认10次) 10次后按超时处理
|
|
|
+// ExecutorService pool = Executors.newSingleThreadExecutor();
|
|
|
+// List<CompletableFuture<Map>> futures = new ArrayList<CompletableFuture<Map>>();
|
|
|
+//
|
|
|
+// List<Map> retList = new ArrayList<Map>();
|
|
|
+// int[] nums = IntStream.range(0, 9).toArray();
|
|
|
+//
|
|
|
+// CompletableFuture<Map> future = CompletableFuture.supplyAsync(() -> {
|
|
|
+// Map<String, Object> threadMap = getCheckCon(taskNode, task, dataMap);
|
|
|
+// boolean bool= (Boolean) threadMap.get("result");
|
|
|
+// return bool? threadMap: null;
|
|
|
+// }, pool).handle((result, ex) -> {
|
|
|
+// if (null != ex) {
|
|
|
+// return null;
|
|
|
+// } else {
|
|
|
+// if(null!=result){
|
|
|
+// retList.add((Map)result);
|
|
|
+// }
|
|
|
+// return result;
|
|
|
+// }
|
|
|
+// });
|
|
|
+//
|
|
|
+// if (null == future.join()){
|
|
|
+// //开启异步串行调用
|
|
|
+// for (int i : nums) {
|
|
|
+// logger.info("节点:" + taskNode.getId() + "进行第" + i + "次异步轮询条件查询");
|
|
|
+// future = future.thenApply(map -> {
|
|
|
+// Map<String, Object> threadMap = null;
|
|
|
+// try {
|
|
|
+// TimeUnit.SECONDS.sleep(5 * i);
|
|
|
+// threadMap = getCheckCon(taskNode, task, dataMap);
|
|
|
+// } catch (InterruptedException ie) {
|
|
|
+// ie.printStackTrace();
|
|
|
+// }
|
|
|
+// boolean bool= (Boolean) threadMap.get("result");
|
|
|
+// return bool? threadMap: null;
|
|
|
+// });
|
|
|
+// if (null != future.join()) {
|
|
|
+// boolean bool = (Boolean) future.join().get("result");
|
|
|
+// if (bool) {
|
|
|
+// retList.add((Map) future.join());
|
|
|
+// break;
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
|
|
|
/*
|
|
|
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, ex)->{
|
|
@@ -489,17 +493,17 @@ public class TaskWorkNode {
|
|
|
}
|
|
|
}); */
|
|
|
|
|
|
- if (retList.size() == 0) {
|
|
|
- logger.error("节点:" + taskNode.getId() + "条件判断超时");
|
|
|
- //return checkCon(taskNode, task, dataMap);
|
|
|
- //pool.shutdown();
|
|
|
- //throw new BizException("执行异常或判断超时");
|
|
|
- return null;
|
|
|
- } else {
|
|
|
- pool.shutdown();
|
|
|
- logger.info("条件判断成功");
|
|
|
- return retList.get(0);
|
|
|
- }
|
|
|
+// if (retList.size() == 0) {
|
|
|
+// logger.error("节点:" + taskNode.getId() + "条件判断超时");
|
|
|
+// //return checkCon(taskNode, task, dataMap);
|
|
|
+// //pool.shutdown();
|
|
|
+// //throw new BizException("执行异常或判断超时");
|
|
|
+// return null;
|
|
|
+// } else {
|
|
|
+// pool.shutdown();
|
|
|
+// logger.info("条件判断成功");
|
|
|
+// return retList.get(0);
|
|
|
+// }
|
|
|
}
|
|
|
|
|
|
/**
|