oyq28 3 роки тому
батько
коміт
f97d6c9631

+ 12 - 4
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/mq/ConsumerHandler.java

@@ -1,6 +1,7 @@
 package com.github.zuihou.business.mq;
 
 import com.github.zuihou.business.util.DynamicRabbitMq;
+import com.github.zuihou.exception.BizException;
 import com.rabbitmq.client.Channel;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
@@ -9,6 +10,8 @@ import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.io.IOException;
+
 @Component
 @Slf4j
 @Data
@@ -35,10 +38,15 @@ public class ConsumerHandler implements ChannelAwareMessageListener {
         try {
             handleMessage(data);
             channel.basicAck(deliveryTag, false);
-        } catch (Exception e) {
-            channel.basicNack(deliveryTag, false, false);//消息消费失败将丢弃该消息
-            //注意 : 第二个参数为true时,重新入队,进入了队列尾部
-            e.printStackTrace();
+        } catch(Exception e){
+            log.error("MQ异常:"+e.getMessage());
+            //是否重试请求
+            if (message.getMessageProperties().getRedelivered()) {
+                channel.basicNack(deliveryTag, false, false);//消息消费失败将丢弃该消息
+            }else{
+                log.error("消息消费失败将丢弃该消息");
+                channel.basicNack(deliveryTag, false, true);
+            }
         }
     }
 

+ 210 - 170
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/mq/TaskWorkNode.java

@@ -50,6 +50,7 @@ import com.github.zuihou.tenant.entity.ModuleInstruction;
 import com.github.zuihou.tenant.service.ModuleInstructionService;
 import com.github.zuihou.tenant.service.ModuleService;
 import com.google.common.collect.Maps;
+import jdk.nashorn.internal.runtime.regexp.joni.exception.InternalException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -57,14 +58,16 @@ import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
 import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestClientException;
 import org.springframework.web.client.RestTemplate;
 
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static java.util.stream.Collectors.groupingBy;
 
@@ -104,14 +107,14 @@ public class TaskWorkNode {
     private ModuleService moduleService;
 
     @Autowired
-    private ModuleInstructionService  moduleInstructionService;
+    private ModuleInstructionService moduleInstructionService;
     private NodeOperationService nodeOperationService;
 
     @Autowired
-    private ResourceBusinessService  resourceBusinessService;
+    private ResourceBusinessService resourceBusinessService;
 
     @Autowired
-    private ResourceAutoCodeService  resourceAutoCodeService;
+    private ResourceAutoCodeService resourceAutoCodeService;
 
     @Autowired
     private ZZoneProductionresourceMapper zZoneProductionresourceMapper;
@@ -147,28 +150,28 @@ public class TaskWorkNode {
 
     public void updateTaskStatusJob(String data) throws InterruptedException {
         JSONObject jsonObject = JSONObject.parseObject(data);
-        System.out.println("返回请求数据: "+data);
+        System.out.println("返回请求数据: " + data);
         String taskNodeId = jsonObject.getString("taskNodeId");
         //业务类型-1、是普通的节点类型,2-是线边库轮询
         String bizType = jsonObject.getString("bizType");
 
         //多产品搬运类型
         String carryType = jsonObject.getString("carryType");
-        if(StringUtil.isEmpty(taskNodeId) || StringUtil.isEmpty(bizType)){
-            throw new InterruptedException("全局通知异常");
+        if (StringUtil.isEmpty(taskNodeId) || StringUtil.isEmpty(bizType)) {
+            throw new RuntimeException("全局通知异常");
         }
         logger.info("=======================" + taskNodeId);
         BaseContextHandler.setTenant("0000");
 
         TaskNode taskNode = taskNodeService.getById(taskNodeId);
 //
-        if (taskNode == null || taskNode.getTaskId()== null) {
+        if (taskNode == null || taskNode.getTaskId() == null) {
             //模拟处理数据
-            throw new BizException(-1, "当前任务节点为空");
+            throw new RuntimeException("当前任务节点为空");
         }
         TTask tTask = taskMapper.selectById(taskNode.getTaskId());
         if (tTask == null || tTask.getStatus() == "3") {
-            throw new BizException(-1, "当前任务为空");
+            throw new RuntimeException("当前任务为空");
         }
 //        //获取配置序数据
         //任务初始化判断
@@ -189,7 +192,7 @@ public class TaskWorkNode {
         ResourceAutoCode resourceAutoCode = resourceAutoCodeService.getById(taskNode.getAutoNode().getId());
         taskNode.setAutoNode(resourceAutoCode);
 
-        isFinal = resourceAutoCode.isFinal()? true : false;
+        isFinal = resourceAutoCode.isFinal() ? true : false;
         msgUtil.redis_set(CacheKey.TASK_CURRENT_NODE + "_" + taskNode.getResourceId(), resourceAutoCode);
 
         List<AAutoNodeLog> autoNodeLogList = autoNodeLogMapper.selectList(Wraps.<AAutoNodeLog>lbQ().eq(AAutoNodeLog::getTaskNodeId, taskNode.getId()));
@@ -215,8 +218,7 @@ public class TaskWorkNode {
                 //人工上下料或质检
                 //不用调接口,直接返回成功
                 msgUtil.redis_set(CacheKey.TASK_CURRENT_NODE_STATUS + "_" + taskNode.getId(), "0");
-            }
-            else {
+            } else {
                 Map queryMap = Maps.newHashMap();
                 queryMap.put("bizType", bizType);
                 queryMap.put("carryType", carryType);
@@ -225,39 +227,42 @@ public class TaskWorkNode {
                 }
 
                 //把产线放进去,避免多次查询
-                ZZoneProductionresource zoneProductionresource = zZoneProductionresourceMapper.selectOne(Wraps.<ZZoneProductionresource>lbQ().eq(ZZoneProductionresource::getResourceId,taskNode.getResourceId()));
+                ZZoneProductionresource zoneProductionresource = zZoneProductionresourceMapper.selectOne(Wraps.<ZZoneProductionresource>lbQ().eq(ZZoneProductionresource::getResourceId, taskNode.getResourceId()));
                 ZZone zZone = zoneService.getById(zoneProductionresource.getZoneId());
-                queryMap.put("zone",zZone);
+                queryMap.put("zone", zZone);
                 //运行条件验证
                 Map conMap = checkCon(taskNode, tTask, queryMap);
-                msgUtil.redis_set_map(CacheKey.TASK_CURRENT_NODE_CONDITION+ "_" + taskNode.getId(), conMap);
+                if (null == conMap) {
+                    throw new RuntimeException("执行超时或执行设备状态异常");
+                }
+                msgUtil.redis_set_map(CacheKey.TASK_CURRENT_NODE_CONDITION + "_" + taskNode.getId(), conMap);
 
-                if("03".equals(taskNode.getInterfaceType())){
-                    TaskNode beforTaskNode = taskNodeService.getNextNTaskNode(taskNode,-1);
+                if ("03".equals(taskNode.getInterfaceType())) {
+                    TaskNode beforTaskNode = taskNodeService.getNextNTaskNode(taskNode, -1);
                     // agv搬运
                     Map agvData = new HashMap();
-                    agvData.put("start",conMap.get("start"));
+                    agvData.put("start", conMap.get("start"));
                     agvData.put("goal", conMap.get("goal"));
-                    agvData.put("taskId",tTask.getId().toString());
-                    agvData.put("taskNodeId",taskNode.getId().toString());
+                    agvData.put("taskId", tTask.getId().toString());
+                    agvData.put("taskNodeId", taskNode.getId().toString());
                     String jsonParam = getRequestParam(conMap);
                     //缓存执行当前节点传参
-                    msgUtil.redis_set(CacheKey.TASK_CURRENT_NODE_PARAMS+"_" + taskNode.getId(), jsonParam);
+                    msgUtil.redis_set(CacheKey.TASK_CURRENT_NODE_PARAMS + "_" + taskNode.getId(), jsonParam);
                     agvHikOrderInfoService.addHikTask(agvData);
-                }else{
+                } else {
                     //组装接口参数
                     HttpHeaders headers = new HttpHeaders();
                     headers.setContentType(MediaType.parseMediaType("application/json;charset=UTF-8"));
                     String jsonParam = getRequestParam(conMap);
                     //缓存执行当前节点传参
-                    msgUtil.redis_set(CacheKey.TASK_CURRENT_NODE_PARAMS+"_" + taskNode.getId(), jsonParam);
+                    msgUtil.redis_set(CacheKey.TASK_CURRENT_NODE_PARAMS + "_" + taskNode.getId(), jsonParam);
                     HttpEntity<String> formEntity = new HttpEntity<String>(jsonParam, headers);
 
                     //动态调用接口和新增指令执行时间
                     log.setMethod(conMap.get("method").toString()).setExecuteTime(new Date());
-                    String instructionUrl = DictionaryKey.INSTRUCTION_URL +"/api/"+ conMap.get("method").toString() ;
-                    System.out.println("instructionUrl="+instructionUrl);
-                    System.out.println("jsonParam="+jsonParam);
+                    String instructionUrl = DictionaryKey.INSTRUCTION_URL + "/api/" + conMap.get("method").toString();
+                    System.out.println("instructionUrl=" + instructionUrl);
+                    System.out.println("jsonParam=" + jsonParam);
                     returnData = restTemplate.postForObject(instructionUrl, formEntity, String.class);
 
                     logger.info("returnData=", returnData);
@@ -270,30 +275,31 @@ public class TaskWorkNode {
             errJsonObject.put("msg", e.getMessage());
             returnData = errJsonObject.toJSONString();
             logger.error("调用接口发生异常" + e.getMessage());
-        }
-        logger.info("===============================接口返回" + returnData);
-        JSONObject retJson = JSONObject.parseObject(returnData);
-        if(retJson!=null){
-            String code = retJson.getString("result").trim();
-            if(code == "true") {
-                //回调处理
-                taskNode.setExeStatus("3").setEndTime(new Date()).setExeResult("1");
-                taskNodeService.updateAllById(taskNode);
-                log.setExeStatus("3").setEndTime(new Date()).setExeResult("1").setFeedback("");
-                msgUtil.redis_del(CacheKey.TASK_CURRENT_NODE_CONDITION + "_" + taskNode.getId());
-                msgUtil.redis_del(CacheKey.TASK_CURRENT_NODE_PARAMS + "_" + taskNode.getId());
-            }else if(code == "0"){
-                //需要人工处理解决警报异常
-                log.setExeResult("0").setManual("1").setFeedback(retJson.getString("msg"));
-            }else if(code == "2"){
-                //执行异常处理(无须人工处理和警报提示)
-                log.setExeResult("0").setManual("0").setFeedback(retJson.getString("msg"));
+        } finally {
+            logger.info("===============================接口返回" + returnData);
+            JSONObject retJson = JSONObject.parseObject(returnData);
+            if (retJson != null) {
+                String code = retJson.getString("result").trim();
+                if (code.equals("true")) {
+                    //回调处理
+                    taskNode.setExeStatus("3").setEndTime(new Date()).setExeResult("1");
+                    taskNodeService.updateAllById(taskNode);
+                    log.setExeStatus("3").setEndTime(new Date()).setExeResult("1").setFeedback("");
+                    msgUtil.redis_del(CacheKey.TASK_CURRENT_NODE_CONDITION + "_" + taskNode.getId());
+                    msgUtil.redis_del(CacheKey.TASK_CURRENT_NODE_PARAMS + "_" + taskNode.getId());
+                } else if (code.equals("false")) {
+                    //需要人工处理解决警报异常
+                    log.setExeResult("0").setManual("1").setFeedback(retJson.getString("msg"));
+                } else if (code.equals("exception")) {
+                    //执行异常处理(无须人工处理和警报提示)
+                    log.setExeResult("0").setManual("0").setFeedback(retJson.getString("msg"));
+                }
+            }
+            if (log.getId() == null) {
+                autoNodeLogService.save(log);
+            } else {
+                autoNodeLogService.updateAllById(log);
             }
-        }
-        if(log.getId()==null){
-            autoNodeLogService.save(log);
-        }else{
-            autoNodeLogService.updateAllById(log);
         }
     }
 
@@ -341,7 +347,7 @@ public class TaskWorkNode {
         TaskNode taskNode = (TaskNode) conMap.get("taskNode");
         TTask task = (TTask) conMap.get("task");
         //业务类型
-        String bizType = conMap.get("bizType")== null ? "" : conMap.get("bizType").toString();
+        String bizType = conMap.get("bizType") == null ? "" : conMap.get("bizType").toString();
 
         String count = conMap.get(taskNode.getId() + "count") == null ? "" : conMap.get(taskNode.getId() + "count").toString();
 
@@ -355,7 +361,7 @@ public class TaskWorkNode {
         jsonObject.put("taskNodeId", taskNode.getId());
 //        jsonObject.put("bizType", bizType);
         //jsonObject.put("uid", taskNode.getCreateUser() + "");
-        jsonObject.put("url",  conMap.get("url"));
+        jsonObject.put("url", conMap.get("url"));
         jsonObject.put("port", conMap.get("port"));
         jsonObject.put("data", conMap.get("data"));
 
@@ -378,20 +384,20 @@ public class TaskWorkNode {
         }
 
         //起点、终点缓存。
-        Storge fstorge =  conMap.get("fromStorge") == null?null:(Storge) conMap.get("fromStorge") ;
-        Storge tstorge =  conMap.get("toStorge") == null?null:(Storge) conMap.get("toStorge") ;
+        Storge fstorge = conMap.get("fromStorge") == null ? null : (Storge) conMap.get("fromStorge");
+        Storge tstorge = conMap.get("toStorge") == null ? null : (Storge) conMap.get("toStorge");
         //如果是线边库的时候,目标缓存位
-        Storge aimStorge =  conMap.get("aimStorge") == null?null:(Storge) conMap.get("aimStorge") ;
+        Storge aimStorge = conMap.get("aimStorge") == null ? null : (Storge) conMap.get("aimStorge");
 
         //此数据用来业务回传
-        bizJsonObject.put("srcPosition",  fstorge==null?"":fstorge.getId().toString());
-        bizJsonObject.put("targetPostion",tstorge==null?"":tstorge.getId().toString());
-        bizJsonObject.put("aimStorge",aimStorge==null?"":aimStorge.getId().toString());
+        bizJsonObject.put("srcPosition", fstorge == null ? "" : fstorge.getId().toString());
+        bizJsonObject.put("targetPostion", tstorge == null ? "" : tstorge.getId().toString());
+        bizJsonObject.put("aimStorge", aimStorge == null ? "" : aimStorge.getId().toString());
         bizJsonObject.put("bizType", bizType);
 
         //放入到线边库
         String xbkFlag = conMap.get(DemoLineConstant.DEMOLINE_XBKFLAG) == null ? "" : conMap.get(DemoLineConstant.DEMOLINE_XBKFLAG).toString();
-        if(StringUtil.isNotEmpty(xbkFlag)){
+        if (StringUtil.isNotEmpty(xbkFlag)) {
             bizJsonObject.put(DemoLineConstant.DEMOLINE_XBKFLAG, xbkFlag);
         }
 
@@ -401,7 +407,7 @@ public class TaskWorkNode {
         jsonObject.put("code", "1");
         logger.info("节点请求参数" + jsonObject.toJSONString());
 
-        msgUtil.redis_set(YunjianConstant.YUNJIAN_CALLBACK_PARAM+"_"+taskNode.getId(), bizJsonObject.toJSONString(), 1, TimeUnit.DAYS);
+        msgUtil.redis_set(YunjianConstant.YUNJIAN_CALLBACK_PARAM + "_" + taskNode.getId(), bizJsonObject.toJSONString(), 1, TimeUnit.DAYS);
         //临时先把请求参数放出来
         msgUtil.redis_set("testParam", bizJsonObject.toJSONString(), 1, TimeUnit.DAYS);
 
@@ -418,48 +424,79 @@ public class TaskWorkNode {
      * @return
      * @throws Exception
      */
-     public Map checkCon(TaskNode taskNode, TTask task, Map<String, Object> dataMap) throws BizException, InterruptedException {
-        Map map = getCheckCon(taskNode, task, dataMap);
-        boolean b = (boolean) map.get("result");
-        if (!b) {
-            Thread.sleep(10000);
-            return checkCon(taskNode, task, dataMap);
+    public Map checkCon(TaskNode taskNode, TTask task, Map<String, Object> dataMap) {
+//        Map map = getCheckCon(taskNode, task, dataMap);
+//        boolean b = (boolean) map.get("result");
+//        if (!b) {
+//            Thread.sleep(10000);
+//            return checkCon(taskNode, task, dataMap);
+//        } else {
+//            return map;
+//        }
+        logger.info("异步消息获取执行状态:");
+        //单线程化线程池
+        //初始化执行次数(默认10次) 10次后按超时处理
+        ExecutorService pool = Executors.newSingleThreadExecutor();
+        List<CompletableFuture<Map>> futures = new ArrayList<CompletableFuture<Map>>();
+
+        List<Map> retList = new ArrayList<Map>();
+        int[] nums = IntStream.range(0, 10).toArray();
+
+        CompletableFuture<Map> future = CompletableFuture.supplyAsync(() -> {
+            Map<String, Object> threadMap = null;
+            //logger.info("线程号:"+Thread.currentThread()+"-"+i+"-获取异步条件信息");
+            threadMap = getCheckCon(taskNode, task, dataMap);
+            //boolean bool= (Boolean) threadMap.get("result");
+            return threadMap;
+        }, pool).handle((result, ex) -> {
+            if (null != ex) {
+                //logger.error("线程号:"+Thread.currentThread()+"-"+i+"-执行异常");
+                return null;
+            } else {
+                return result;
+            }
+        });
+
+        for (int i : nums) {
+            logger.info("节点:" + taskNode.getId() + "进行第" + i + "次异步轮询条件查询");
+            future = future.thenApply(map -> {
+                Map<String, Object> threadMap = null;
+                try {
+                    TimeUnit.SECONDS.sleep(2 * i);
+                    threadMap = getCheckCon(taskNode, task, dataMap);
+                } catch (InterruptedException ie) {
+                    ie.printStackTrace();
+                }
+                return threadMap;
+            });
+            if (null != future.join()) {
+                boolean bool = (Boolean) future.join().get("result");
+                if (bool) {
+                    retList.add((Map) future.join());
+                    break;
+                }
+            }
+        }
+
+         /*
+         CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, ex)->{
+             if(res!=null && ex==null){
+                 retList.add((Map)res);
+             }
+         }); */
+
+        if (retList.size() == 0) {
+            logger.error("节点:" + taskNode.getId() + "条件判断超时");
+            //return checkCon(taskNode, task, dataMap);
+            //pool.shutdown();
+            //throw new BizException("执行异常或判断超时");
+            return null;
         } else {
-            return map;
+            pool.shutdown();
+            logger.info("条件判断成功");
+            return retList.get(0);
         }
-//        logger.info("异步消息获取执行状态:");
-//        ExecutorService pool = Executors.newCachedThreadPool();
-//        CompletableFuture<Map> future = CompletableFuture.supplyAsync(()->{
-//            Map map = Maps.newHashMap();
-//            try {
-//                TimeUnit.SECONDS.sleep(2);
-//                map = getCheckCon(taskNode, task, dataMap);
-//            }
-//            catch(InterruptedException e){
-//                throw new RuntimeException(e);
-//            }
-//            return map;
-//        }, pool);
-//         future.whenCompleteAsync((v, ex) ->{
-//             if(ex == null){
-//                 Map map = future.join();
-//                 boolean bool = (boolean) map.get("result");
-//                 if(bool){
-//                     logger.info("条件判断成功,关闭线程池");
-//                     pool.shutdown();
-//                 }else{
-//                     try {
-//                         checkCon(taskNode, task, dataMap);
-//                     } catch (Exception e) {
-//                         e.printStackTrace();
-//                     }
-//                 }
-//             }else{
-//                 throw new RuntimeException(ex);
-//             }
-//         });
-//         return future.join();
-     }
+    }
 
     /**
      * 具体验证
@@ -482,7 +519,7 @@ public class TaskWorkNode {
 //        String robotType = resourceAutoCode.getResourceId().toString();
 
         //获取节点指令集
-        Module module  = getModuleInfo(task.getResourceId().toString());
+        Module module = getModuleInfo(task.getResourceId().toString());
         List<ModuleInstruction> moduleInstructions = moduleInstructionService.list(new LbqWrapper<ModuleInstruction>().eq(ModuleInstruction::getModuleId, module.getId()).eq(ModuleInstruction::getPublishStatus, "1"));
 
         //判断夹具资源
@@ -491,7 +528,7 @@ public class TaskWorkNode {
         Map<String, Object> map = Maps.newHashMap();
         map.put("taskNode", taskNode);
         map.put("task", task);
-        map.put(taskNode.getId() +"count", count);
+        map.put(taskNode.getId() + "count", count);
         map.put("bizType", bizType);
         map.put("categoryName", DictionaryKey.RESOURCE_CATEGORY.get(category));
         map.put("instructions", moduleInstructions);
@@ -507,36 +544,37 @@ public class TaskWorkNode {
             //map.put("robotType", DictionaryKey.LINE_ROBOT_NODES.get(robotType));
 //            map.put("robotType", DictionaryKey.YJ_ROBOT_NODES.get(robotType));
             nodeOperationService = MsgUtil.getBean(RobotNodeServiceImpl.class);
-        } else if("4".equals(category)) {
-            nodeOperationService  = MsgUtil.getBean(OtherNodeServiceImpl.class);
+        } else if ("4".equals(category)) {
+            nodeOperationService = MsgUtil.getBean(OtherNodeServiceImpl.class);
         }
         //初始化设备资源
-        logger.info("========设备资源初始化=======" + module.getName()+"====="+taskNode.getResourceId());
+        logger.info("========设备资源初始化=======" + module.getName() + "=====" + taskNode.getResourceId());
         nodeOperationService.initResource(taskNode, task, map);
 
-        if (bizType.equals(BizConstant.MQ_TASK_NODE_TYPE_COMMON))  {//正常节点类型
+        if (bizType.equals(BizConstant.MQ_TASK_NODE_TYPE_COMMON)) {//正常节点类型
 //            BomProcedureProductionresource bomProcedureProductionresource = bomProcedureProductionresourceService.getOne(new QueryWrapper<BomProcedureProductionresource>().eq("resourceId",task.getResourceId()).eq("procedureId", task.getProcedureId()));
             //List<ResourceAutoCode>  resourceAutoCodeList = resourceAutoCodeService.getNodeList(resourceAutoCode);
             //业务序列节点
             //ResourceAutoCode parentAutoCode = resourceAutoCodeService.getById(resourceAutoCode.getParentId());
-            logger.info("=============进入自动化节点条件检测==============" + resourceAutoCode.getName() );
+            logger.info("=============进入自动化节点条件检测==============" + resourceAutoCode.getName());
             map = nodeOperationService.checkCondition(taskNode, task, map);
-        }else{
-            logger.info("=============进入线边库条件检测==============" + resourceAutoCode.getName() );
+        } else {
+            logger.info("=============进入线边库条件检测==============" + resourceAutoCode.getName());
             map = checkXbkCon(taskNode, task, map);
         }
+        //System.out.println(map.get("result").toString());
         return map;
     }
 
     /**
-     *  线边库条件处理判断
-     * @param dataMap
+     * 线边库条件处理判断
      *
+     * @param dataMap
      * @return
      */
-    public Map checkXbkCon(TaskNode taskNode,TTask task, Map<String, Object> dataMap){
-        ZZone zone = dataMap.get("zone")==null?null:(ZZone)dataMap.get("zone");
-        int  count = Integer.parseInt (dataMap.get(taskNode.getId().toString() + "count") == null ? "0" : dataMap.get(taskNode.getId().toString() + "count").toString());
+    public Map checkXbkCon(TaskNode taskNode, TTask task, Map<String, Object> dataMap) {
+        ZZone zone = dataMap.get("zone") == null ? null : (ZZone) dataMap.get("zone");
+        int count = Integer.parseInt(dataMap.get(taskNode.getId().toString() + "count") == null ? "0" : dataMap.get(taskNode.getId().toString() + "count").toString());
 
         Storge jqrStorge = null;
 
@@ -548,7 +586,7 @@ public class TaskWorkNode {
 
         //前序是设备序并且最后一个节点是agv搬运,锁定agv相关库位信息
 
-        if(BizConstant.ROBOT_TYPE_CACHE.equals(zoneService.getRobotTypebyZone(zone.getName()))){//有缓存位的机器人
+        if (BizConstant.ROBOT_TYPE_CACHE.equals(zoneService.getRobotTypebyZone(zone.getName()))) {//有缓存位的机器人
             //执行到第几步
             dataMap.put(taskNode.getId().toString() + "count", count);
             dataMap.put(YunjianConstant.YUNJIAN_ROBORT_CACHE_FLAG, "1");
@@ -557,23 +595,23 @@ public class TaskWorkNode {
                 //判断机器人是否为空
                 Map<String, List<ProductionresourcePosition>> jqrMap = robotList.stream().collect(groupingBy(ProductionresourcePosition::getPointId));
 
-                robotList = productionresourcePositionService.getFreeProductionresourcePositionByIds(new String[] {taskNode.getResourceId().toString()});
+                robotList = productionresourcePositionService.getFreeProductionresourcePositionByIds(new String[]{taskNode.getResourceId().toString()});
 //                //目标地址和类型
 //                targetStorge = workpieceService.getWorkPieceTargetStock(taskNode.getId());
 
-                TaskNode nextTaskNode = taskNodeService.getNextNTaskNode(taskNode,1);
-                Map returnMap  = getTargetStorge(nextTaskNode,taskNode.getFindAgvFlag(),zone);
+                TaskNode nextTaskNode = taskNodeService.getNextNTaskNode(taskNode, 1);
+                Map returnMap = getTargetStorge(nextTaskNode, taskNode.getFindAgvFlag(), zone);
 
-                targetStorge = returnMap.get("store")==null?null:(Storge)returnMap.get("store");
-                Storge agvEndStationStorge = returnMap.get("agvEndStationStorge")==null?null:(Storge)returnMap.get("agvEndStationStorge");
+                targetStorge = returnMap.get("store") == null ? null : (Storge) returnMap.get("store");
+                Storge agvEndStationStorge = returnMap.get("agvEndStationStorge") == null ? null : (Storge) returnMap.get("agvEndStationStorge");
 
 //                jqrStorge = storgeService.getById(robotList.get(0).getStorgeId());
 
                 Storge startStore = workpieceService.getWorkPieceStock(taskNode.getCompleteBatchNo(), false);
 
-                ProductionresourcePosition po = logical(robotList,targetStorge,zone);
+                ProductionresourcePosition po = logical(robotList, targetStorge, zone);
                 //机器人手抓没被锁定,并且有空闲未知
-                if (po!=null) {
+                if (po != null) {
                     jqrStorge = storgeService.getById(jqrMap.get(DemoLineConstant.DEMOLINE_RJQR_ZS).get(0).getStorgeId());
                     jqrStorge.setCompleteBatchNo(task.getCompleteBatchNo());
                     storgeService.lockStorge(jqrStorge, taskNode.getId());
@@ -596,12 +634,12 @@ public class TaskWorkNode {
 
                     //满足条件--机器人空闲,设备或线边库有一个空闲
                     Map locationMap = new HashMap();
-                    locationMap.put("location",startStore);
+                    locationMap.put("location", startStore);
                     dataMap.put("Data", locationMap);
 
                     dataMap.put("method", "MoveRobotPosition");
                 }
-                dataMap.put("result", po!=null);
+                dataMap.put("result", po != null);
             } else if (count == 1) {
                 jqrStorge = (Storge) msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + YunjianConstant.YUNJIAN_JXS + taskNode.getId());
                 dataMap.put("method", "GetFlexibleWire");
@@ -609,7 +647,7 @@ public class TaskWorkNode {
                 dataMap.put("fromStorge", currentStore);
 
                 Map locationMap = new HashMap();
-                locationMap.put("location",currentStore);
+                locationMap.put("location", currentStore);
                 dataMap.put("Data", locationMap);
                 //出入库类型
                 dataMap.put(DemoLineConstant.DEMOLINE_STOCK_TYPE, DemoLineConstant.DEMOLINE_STOCK_TYPE_ALL_CRK);
@@ -626,61 +664,62 @@ public class TaskWorkNode {
                 Storge hcwStorge = (Storge) msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + YunjianConstant.YUNJIAN_HCW + taskNode.getId());
 
                 Map locationMap = new HashMap();
-                locationMap.put("location",hcwStorge);
+                locationMap.put("location", hcwStorge);
                 dataMap.put("Data", locationMap);
 
                 dataMap.put("toStorge", hcwStorge);
                 dataMap.put("result", true);
-            }if(count==3) {
+            }
+            if (count == 3) {
                 Map locationMap = new HashMap();
-                locationMap.put("location",targetStorge);
+                locationMap.put("location", targetStorge);
                 dataMap.put("Data", locationMap);
 
                 dataMap.put("method", "MoveRobotPosition");
-            }else if(count == 4){//从缓存位拿出来
+            } else if (count == 4) {//从缓存位拿出来
                 dataMap.put("method", "GetFlexibleWire");
-                Storge hcwStorge = (Storge)msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + YunjianConstant.YUNJIAN_HCW + taskNode.getId());
+                Storge hcwStorge = (Storge) msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + YunjianConstant.YUNJIAN_HCW + taskNode.getId());
 
                 Map locationMap = new HashMap();
-                locationMap.put("location",hcwStorge);
+                locationMap.put("location", hcwStorge);
                 dataMap.put("Data", locationMap);
 
                 dataMap.put("fromStorge", hcwStorge);
-                Storge handStorge = (Storge)msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + YunjianConstant.YUNJIAN_JXS + taskNode.getId());
+                Storge handStorge = (Storge) msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + YunjianConstant.YUNJIAN_JXS + taskNode.getId());
                 dataMap.put("toStorge", handStorge);
                 //出入库类型
-                dataMap.put(DemoLineConstant.DEMOLINE_STOCK_TYPE,DemoLineConstant.DEMOLINE_STOCK_TYPE_ALL_CRK);
-            }else if(count == 5){
+                dataMap.put(DemoLineConstant.DEMOLINE_STOCK_TYPE, DemoLineConstant.DEMOLINE_STOCK_TYPE_ALL_CRK);
+            } else if (count == 5) {
                 dataMap.put("method", "SendFlexibleWire");
                 //出入库类型
-                dataMap.put(DemoLineConstant.DEMOLINE_STOCK_TYPE,DemoLineConstant.DEMOLINE_STOCK_TYPE_ALL_CRK);
+                dataMap.put(DemoLineConstant.DEMOLINE_STOCK_TYPE, DemoLineConstant.DEMOLINE_STOCK_TYPE_ALL_CRK);
 
                 Map locationMap = new HashMap();
-                locationMap.put("location",targetStorge);
+                locationMap.put("location", targetStorge);
                 dataMap.put("Data", locationMap);
 
                 dataMap.put("toStorge", targetStorge);
 
-                Storge handStorge = (Storge)msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + YunjianConstant.YUNJIAN_JXS + taskNode.getId());
+                Storge handStorge = (Storge) msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + YunjianConstant.YUNJIAN_JXS + taskNode.getId());
                 dataMap.put("fromStorge", handStorge);
             }
-        }else {//没缓存位的机器人,伺服陀机
+        } else {//没缓存位的机器人,伺服陀机
 
             if (count == 0) {//第一步的时候锁定资源
                 //目标地址和类型
                 targetStorge = workpieceService.getWorkPieceTargetStock(taskNode.getId());
-                robotList = productionresourcePositionService.getFreeProductionresourcePositionByIds(new String[] {taskNode.getResourceId().toString()});
+                robotList = productionresourcePositionService.getFreeProductionresourcePositionByIds(new String[]{taskNode.getResourceId().toString()});
                 //目标地址和类型
                 jqrStorge = storgeService.getById(robotList.get(0).getStorgeId());
 
                 dataMap.put("method", "GetServoStacker");
-                if (logical(robotList,targetStorge,zone)!=null) {
+                if (logical(robotList, targetStorge, zone) != null) {
                     Storge startStore = workpieceService.getWorkPieceStock(taskNode.getCompleteBatchNo(), false);
 
                     dataMap.put("fromStorge", startStore);
 
                     Map locationMap = new HashMap();
-                    locationMap.put("location",startStore);
+                    locationMap.put("location", startStore);
                     dataMap.put("Data", locationMap);
                     //出入库类型
                     dataMap.put(DemoLineConstant.DEMOLINE_STOCK_TYPE, DemoLineConstant.DEMOLINE_STOCK_TYPE_ALL_CRK);
@@ -699,13 +738,13 @@ public class TaskWorkNode {
                     //目标缓存
                     msgUtil.redis_set(DemoCacheKey.YUNJIAN_CAMP + "_" + DemoLineConstant.YUNJIAN_TARGET_STORE + taskNode.getId(), targetStorge, 1, TimeUnit.DAYS);
                 }
-            }else if(count==1){
-                targetStorge = msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + DemoLineConstant.YUNJIAN_TARGET_STORE + taskNode.getId())==null?null:(Storge) msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + DemoLineConstant.YUNJIAN_TARGET_STORE + taskNode.getId());
+            } else if (count == 1) {
+                targetStorge = msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + DemoLineConstant.YUNJIAN_TARGET_STORE + taskNode.getId()) == null ? null : (Storge) msgUtil.redis_get(DemoCacheKey.YUNJIAN_CAMP + "_" + DemoLineConstant.YUNJIAN_TARGET_STORE + taskNode.getId());
                 dataMap.put("method", "SendServoStacker");
                 dataMap.put("toStorge", targetStorge);
 
                 Map locationMap = new HashMap();
-                locationMap.put("location",targetStorge);
+                locationMap.put("location", targetStorge);
                 dataMap.put("Data", locationMap);
             }
             dataMap.put("result", true);
@@ -714,68 +753,68 @@ public class TaskWorkNode {
     }
 
 
-    public Map getTargetStorge(TaskNode taskNode,String findAgvFlag,ZZone zZone){
+    public Map getTargetStorge(TaskNode taskNode, String findAgvFlag, ZZone zZone) {
         Map returnMap = new HashMap();
         Storge targetStorge = null;
         Storge agvEndStationStorge = null;
         Long resourceId = taskNode.getTargetResourceId();
 
-        if("1".equals(findAgvFlag)){//需要提前锁定下个接驳位
+        if ("1".equals(findAgvFlag)) {//需要提前锁定下个接驳位
             //获取接驳位。
             Map map = storgeService.getPlateStorgeByNo(zZone);
-            targetStorge = map.get("storge") ==null?null:(Storge)map.get("storge");
-            String targetResourceId = map.get("resourceId") ==null?null:(String)map.get("resourceId");
+            targetStorge = map.get("storge") == null ? null : (Storge) map.get("storge");
+            String targetResourceId = map.get("resourceId") == null ? null : (String) map.get("resourceId");
             //对面产线接驳位不为空TODO
             String[] jbwArr = DictionaryKey.YJ_ZONE_JBW.get(zZone.getName()).split(",");
             List<ProductionresourcePosition> jbwBList = productionresourcePositionService.getFreeProductionresourcePositionByNos(jbwArr);
 
-            if(targetStorge!=null&&CollectionUtil.isNotEmpty(jbwBList)){
+            if (targetStorge != null && CollectionUtil.isNotEmpty(jbwBList)) {
                 ProductionresourcePosition position = jbwBList.get(0);
-                agvEndStationStorge =storgeService.getById(position.getStorgeId());
+                agvEndStationStorge = storgeService.getById(position.getStorgeId());
 
-                returnMap.put("store",targetStorge);
+                returnMap.put("store", targetStorge);
 
                 //更新到targetresourceid
 //                TaskNode nextTaskNode = taskNodeService.getNextNTaskNode(taskNode,1);
-                TaskNode lastOperationTaskNode = taskNodeService.getOne(Wraps.<TaskNode>lbQ().eq(TaskNode::getTaskId,taskNode.getTaskId()).orderByDesc(TaskNode::getCompleteBatchSort).last("limit 1"));
+                TaskNode lastOperationTaskNode = taskNodeService.getOne(Wraps.<TaskNode>lbQ().eq(TaskNode::getTaskId, taskNode.getTaskId()).orderByDesc(TaskNode::getCompleteBatchSort).last("limit 1"));
                 taskNode.setTargetResourceId(Long.parseLong(targetResourceId));
                 lastOperationTaskNode.setTargetResourceId(position.getResourceId());
                 lastOperationTaskNode.setResourceId(taskNode.getTargetResourceId());
                 taskNodeService.updateById(taskNode);
                 taskNodeService.updateById(lastOperationTaskNode);
             }
-        }else {
+        } else {
             //目标设备
-            if(resourceId!=null){
-                List<ProductionresourcePosition> targetList = productionresourcePositionService.getFreeProductionresourcePositionByIds(new String[] {resourceId.toString()});
-                if(CollectionUtil.isNotEmpty(targetList)){
+            if (resourceId != null) {
+                List<ProductionresourcePosition> targetList = productionresourcePositionService.getFreeProductionresourcePositionByIds(new String[]{resourceId.toString()});
+                if (CollectionUtil.isNotEmpty(targetList)) {
                     targetStorge = storgeService.getById(targetList.get(0).getStorgeId());
-                    returnMap.put("store",targetStorge);
+                    returnMap.put("store", targetStorge);
                 }
             }
         }
-        returnMap.put("agvEndStationStorge",agvEndStationStorge);
+        returnMap.put("agvEndStationStorge", agvEndStationStorge);
         return returnMap;
     }
 
     /**
-     *  设备动态逻辑判断处理
+     * 设备动态逻辑判断处理
      *
      * @param robotList
      * @return
      */
-    public ProductionresourcePosition logical(List<ProductionresourcePosition> robotList,Storge targetStorge,ZZone zone){
+    public ProductionresourcePosition logical(List<ProductionresourcePosition> robotList, Storge targetStorge, ZZone zone) {
         //设备不存在
-        if(checkRobot(robotList,zone.getName())){
+        if (checkRobot(robotList, zone.getName())) {
             return null;
         }
         //资源临界判断
         int current_running_num = storgeService.getstorgeByZone(zone.getId().toString());
-        if(current_running_num == DictionaryKey.RESOURCE_MAX_NUM){
+        if (current_running_num == DictionaryKey.RESOURCE_MAX_NUM) {
             return null;
         }
 
-        if(targetStorge==null){
+        if (targetStorge == null) {
             return null;
         }
 
@@ -784,14 +823,15 @@ public class TaskWorkNode {
 
     /**
      * 判断机器人是否可用
+     *
      * @return
      */
-    public boolean checkRobot(List<ProductionresourcePosition> robotList,String  zoneName){
-        if(BizConstant.ROBOT_TYPE_NOCACHE.equals(zoneService.getRobotTypebyZone(zoneName))){
+    public boolean checkRobot(List<ProductionresourcePosition> robotList, String zoneName) {
+        if (BizConstant.ROBOT_TYPE_NOCACHE.equals(zoneService.getRobotTypebyZone(zoneName))) {
             return CollectionUtil.isNotEmpty(robotList) && robotList.size() == 1;
-        }else if(BizConstant.ROBOT_TYPE_CACHE.equals(zoneService.getRobotTypebyZone(zoneName))){//手抓必须空闲。
+        } else if (BizConstant.ROBOT_TYPE_CACHE.equals(zoneService.getRobotTypebyZone(zoneName))) {//手抓必须空闲。
             Map<String, List<ProductionresourcePosition>> jqrMap = robotList.stream().collect(groupingBy(ProductionresourcePosition::getPointId));
-            return CollectionUtil.isNotEmpty(robotList) && robotList.size() >1&&jqrMap.containsKey(DemoLineConstant.DEMOLINE_RJQR_ZS);
+            return CollectionUtil.isNotEmpty(robotList) && robotList.size() > 1 && jqrMap.containsKey(DemoLineConstant.DEMOLINE_RJQR_ZS);
         }
         return true;
     }

+ 2 - 1
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/util/DynamicRabbitMq.java

@@ -74,7 +74,8 @@ public class DynamicRabbitMq {
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//消费者需手动确认
         container.setConsumersPerQueue(1);
         container.setMessageListener(consumerHandler);
-        //container.setAdviceChain(createRetry());
+        //配置重发机制
+        container.setAdviceChain(createRetry());
         container.setDefaultRequeueRejected(false);
         container.start();
     }