浏览代码

更新异步调用方法

oyq28 3 年之前
父节点
当前提交
e2a02fcd03

+ 28 - 24
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/mq/TaskWorkNode.java

@@ -440,40 +440,44 @@ public class TaskWorkNode {
         List<CompletableFuture<Map>> futures = new ArrayList<CompletableFuture<Map>>();
 
         List<Map> retList = new ArrayList<Map>();
-        int[] nums = IntStream.range(0, 10).toArray();
+        int[] nums = IntStream.range(0, 9).toArray();
 
         CompletableFuture<Map> future = CompletableFuture.supplyAsync(() -> {
-            Map<String, Object> threadMap = null;
-            //logger.info("线程号:"+Thread.currentThread()+"-"+i+"-获取异步条件信息");
-            threadMap = getCheckCon(taskNode, task, dataMap);
-            //boolean bool= (Boolean) threadMap.get("result");
-            return threadMap;
+            Map<String, Object> threadMap = getCheckCon(taskNode, task, dataMap);
+            boolean bool= (Boolean) threadMap.get("result");
+            return bool? threadMap: null;
         }, pool).handle((result, ex) -> {
             if (null != ex) {
-                //logger.error("线程号:"+Thread.currentThread()+"-"+i+"-执行异常");
                 return null;
             } else {
+                if(null!=result){
+                   retList.add((Map)result);
+                }
                 return result;
             }
         });
 
-        for (int i : nums) {
-            logger.info("节点:" + taskNode.getId() + "进行第" + i + "次异步轮询条件查询");
-            future = future.thenApply(map -> {
-                Map<String, Object> threadMap = null;
-                try {
-                    TimeUnit.SECONDS.sleep(2 * i);
-                    threadMap = getCheckCon(taskNode, task, dataMap);
-                } catch (InterruptedException ie) {
-                    ie.printStackTrace();
-                }
-                return threadMap;
-            });
-            if (null != future.join()) {
-                boolean bool = (Boolean) future.join().get("result");
-                if (bool) {
-                    retList.add((Map) future.join());
-                    break;
+        if (null == future.join()){
+            //开启异步串行调用
+            for (int i : nums) {
+                logger.info("节点:" + taskNode.getId() + "进行第" + i + "次异步轮询条件查询");
+                future = future.thenApply(map -> {
+                    Map<String, Object> threadMap = null;
+                    try {
+                        TimeUnit.SECONDS.sleep(5 * i);
+                        threadMap = getCheckCon(taskNode, task, dataMap);
+                    } catch (InterruptedException ie) {
+                        ie.printStackTrace();
+                    }
+                    boolean bool= (Boolean) threadMap.get("result");
+                    return bool? threadMap: null;
+                });
+                if (null != future.join()) {
+                    boolean bool = (Boolean) future.join().get("result");
+                    if (bool) {
+                        retList.add((Map) future.join());
+                        break;
+                    }
                 }
             }
         }