|
|
@@ -1,6 +1,5 @@
|
|
|
package com.github.zuihou.business.mq;
|
|
|
|
|
|
-import cn.hutool.core.bean.BeanUtil;
|
|
|
import cn.hutool.core.collection.CollectionUtil;
|
|
|
import cn.hutool.core.util.ObjectUtil;
|
|
|
import cn.hutool.json.JSONArray;
|
|
|
@@ -47,7 +46,6 @@ import com.github.zuihou.business.productionResourceCenter.service.impl.AGVNodeS
|
|
|
import com.github.zuihou.business.productionResourceCenter.service.impl.MachineNodeServiceImpl;
|
|
|
import com.github.zuihou.business.productionResourceCenter.service.impl.OtherNodeServiceImpl;
|
|
|
import com.github.zuihou.business.productionResourceCenter.service.impl.RobotNodeServiceImpl;
|
|
|
-import com.github.zuihou.business.util.DynamicRabbitMq;
|
|
|
import com.github.zuihou.business.util.MsgUtil;
|
|
|
import com.github.zuihou.common.constant.*;
|
|
|
import com.github.zuihou.common.util.StringUtil;
|
|
|
@@ -60,10 +58,6 @@ import com.github.zuihou.tenant.entity.ModuleInstruction;
|
|
|
import com.github.zuihou.tenant.service.ModuleInstructionService;
|
|
|
import com.github.zuihou.tenant.service.ModuleService;
|
|
|
import com.google.common.collect.Maps;
|
|
|
-import io.swagger.models.auth.In;
|
|
|
-import jdk.nashorn.internal.runtime.regexp.joni.exception.InternalException;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import net.sf.jsqlparser.statement.select.KSQLWindow;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
@@ -73,19 +67,17 @@ import org.springframework.http.HttpEntity;
|
|
|
import org.springframework.http.HttpHeaders;
|
|
|
import org.springframework.http.MediaType;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
-import org.springframework.web.client.AsyncRestTemplate;
|
|
|
-import org.springframework.web.client.RestClientException;
|
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
import org.springframework.web.reactive.function.client.WebClient;
|
|
|
import reactor.core.publisher.Mono;
|
|
|
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.*;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.locks.ReentrantLock;
|
|
|
import java.util.function.Function;
|
|
|
import java.util.function.Predicate;
|
|
|
import java.util.stream.Collectors;
|
|
|
-import java.util.stream.IntStream;
|
|
|
|
|
|
import static java.util.stream.Collectors.groupingBy;
|
|
|
@Component
|
|
|
@@ -181,20 +173,14 @@ public class TaskWorkNode {
|
|
|
private String storagePath;
|
|
|
|
|
|
//总控端口
|
|
|
- private final String ZK_port = "120";
|
|
|
-
|
|
|
private static Logger logger = LoggerFactory.getLogger(TaskWorkNode.class);
|
|
|
|
|
|
- private boolean isFinal = false;
|
|
|
-
|
|
|
- private final String fileName = "";
|
|
|
+ private String fileName = "";
|
|
|
|
|
|
private static ReentrantLock lock = new ReentrantLock();
|
|
|
|
|
|
private boolean lockCondition = true;
|
|
|
|
|
|
- private String feedBack;
|
|
|
-
|
|
|
public void updateTaskStatusJob(String data, String consumerQueue, int priority) throws InterruptedException {
|
|
|
|
|
|
String returnData = "";
|
|
|
@@ -210,16 +196,6 @@ public class TaskWorkNode {
|
|
|
String taskNodeId = jsonObject.getString("taskNodeId");
|
|
|
|
|
|
logger.info("节点{}准备开始执行任务",taskNodeId);
|
|
|
- //logger.warn("节点{}(优先级{})准备开始执行任务",taskNodeId, priority);
|
|
|
- //判断是否存在优先级
|
|
|
- /*
|
|
|
- Object priorityTaskNodeIdObj = msgUtil.redis_get("PRIORITY_PROCESSING");
|
|
|
- if(!Objects.isNull(priorityTaskNodeIdObj) && StringUtil.isNotEmpty(priorityTaskNodeIdObj.toString())){
|
|
|
- String priorityTaskNodeId = priorityTaskNodeIdObj.toString();
|
|
|
- if(!priorityTaskNodeId.equals(taskNodeId)){
|
|
|
- throw new InterruptedException("存在优先任务:"+priorityTaskNodeId+",当前消息重新进入队尾");
|
|
|
- }
|
|
|
- } */
|
|
|
//优先级功能是否启用
|
|
|
Object priorityLock = msgUtil.redis_get("PRIORITY_LOCK");
|
|
|
if(priorityLock==null) {
|
|
|
@@ -292,25 +268,19 @@ public class TaskWorkNode {
|
|
|
tTask = taskMapper.selectById(taskNode.getTaskId());
|
|
|
//初始化业务日志
|
|
|
log = getBusinessLog(tTask, taskNode);
|
|
|
- feedBack = log.getFeedback();
|
|
|
try {
|
|
|
lockFlag = lock.tryLock();
|
|
|
- lockCondition = checkFlowCondition(tTask);
|
|
|
+ //lockCondition = checkFlowCondition(tTask);
|
|
|
if(lockFlag){
|
|
|
logger.info("节点{}获取到了执行锁",taskNode.getId());
|
|
|
- TaskNode hisTask = taskNodeMapper.selectById(taskNodeId);
|
|
|
- if("3".equals(hisTask.getExeResult())){
|
|
|
- logger.info("节点{}已经执行完成,重复推送丢弃",taskNodeId);
|
|
|
- }
|
|
|
-
|
|
|
//业务类型-1、是普通的节点类型,2-是线边库轮询
|
|
|
String bizType = jsonObject.getString("bizType");
|
|
|
- String handMode = jsonObject.getString("handMode");
|
|
|
+ String handMode = jsonObject.containsKey("handMode")?jsonObject.getString("handMode"):"";
|
|
|
//多产品搬运类型
|
|
|
- String carryType = jsonObject.getString("carryType");
|
|
|
-
|
|
|
- String queueName = "device.queue."+ BizConstant.MQ_GLOBAL_EXCEPTION;
|
|
|
+ String carryType = jsonObject.containsKey("carryType")?jsonObject.getString("carryType"):"";
|
|
|
|
|
|
+ //String queueName = "device.queue."+ BizConstant.MQ_GLOBAL_EXCEPTION;
|
|
|
+ /*
|
|
|
if (consumerQueue.equals(queueName)) {
|
|
|
StringBuilder buffer = new StringBuilder();
|
|
|
//处理产线存储
|
|
|
@@ -334,6 +304,7 @@ public class TaskWorkNode {
|
|
|
return;
|
|
|
}
|
|
|
logger.info("=======================" + taskNodeId);
|
|
|
+ */
|
|
|
|
|
|
if (tTask == null || tTask.getStatus() == "3") {
|
|
|
log.setTaskNodeId(taskNode.getId()).setExeResult("0").setManual("0").setFeedback("当前任务为空");
|
|
|
@@ -359,7 +330,6 @@ public class TaskWorkNode {
|
|
|
ResourceAutoCode resourceAutoCode = resourceAutoCodeService.getById(taskNode.getAutoNode().getId());
|
|
|
taskNode.setAutoNode(resourceAutoCode);
|
|
|
|
|
|
- isFinal = resourceAutoCode.isFinal() ? true : false;
|
|
|
msgUtil.redis_set(CacheKey.TASK_CURRENT_NODE + "_" + taskNode.getResourceId(), resourceAutoCode, 3, TimeUnit.DAYS);
|
|
|
|
|
|
//更新节点任务
|
|
|
@@ -432,7 +402,6 @@ public class TaskWorkNode {
|
|
|
ZZone zZone = zoneService.getById(zoneProductionresource.getZoneId());
|
|
|
queryMap.put("zone", zZone);
|
|
|
|
|
|
-
|
|
|
BBom bom = bBomMapper.selectById(tTask.getBomId());
|
|
|
//工艺配置加工产线
|
|
|
ZZone bomzZone = zoneService.getById(bom.getZoneId());
|
|
|
@@ -442,13 +411,10 @@ public class TaskWorkNode {
|
|
|
List<TWorkpiece>list = workpieceMapper.selectList(Wraps.<TWorkpiece>lbQ().eq(TWorkpiece::getCompleteBatchNo,taskNode.getCompleteBatchNo()));
|
|
|
if(CollectionUtil.isNotEmpty(list)){
|
|
|
TWorkpiece workpiece = list.get(0);
|
|
|
- workpiece.setProcedureId(tTask.getProcedureId());
|
|
|
- workpiece.setTaskNodeId(Long.parseLong(taskNodeId));
|
|
|
+ workpiece.setProcedureId(tTask.getProcedureId()).setTaskNodeId(Long.parseLong(taskNodeId));
|
|
|
workpieceMapper.updateById(workpiece);
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
//运行条件验证
|
|
|
conMap = checkCon(taskNode, tTask, queryMap);
|
|
|
if(conMap.isEmpty()){
|
|
|
@@ -459,7 +425,7 @@ public class TaskWorkNode {
|
|
|
logger.info("节点{}检查资源返回{}",taskNode.getId(), conMap);
|
|
|
msgUtil.redis_set_map(CacheKey.TASK_CURRENT_NODE_CONDITION + "_" + taskNode.getId(), conMap);
|
|
|
|
|
|
- if((boolean)conMap.get("result")){
|
|
|
+ if(conMap.containsKey("result") && (boolean)conMap.get("result")){
|
|
|
if ("03".equals(taskNode.getInterfaceType())) {
|
|
|
// 执行线边库操作
|
|
|
if("1".equals(conMap.get("targetxbk"))){
|
|
|
@@ -594,7 +560,8 @@ public class TaskWorkNode {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- }else{
|
|
|
+ }
|
|
|
+ else{
|
|
|
logger.info("节点{}未获取到了执行锁",taskNode.getId());
|
|
|
conMap.put("result",false);
|
|
|
}
|
|
|
@@ -671,22 +638,22 @@ public class TaskWorkNode {
|
|
|
log.setExeResult("0").setManual("0").setFeedback("节点"+taskNode.getId()+"操作响应超时");
|
|
|
}else{
|
|
|
//设备不在线默认存储为空
|
|
|
- log.setExeResult("0").setManual("0").setFeedback(null);
|
|
|
+ log.setExeResult("0").setManual("0").setFeedback("设备不在线");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
if (log.getId() == null) {
|
|
|
autoNodeLogService.save(log);
|
|
|
} else {
|
|
|
- if(StringUtil.isNotEmpty(feedBack) && (feedBack.contains("imcsTOccsEnable") || !log.getFeedback().equals(feedBack))) {
|
|
|
+ if(retJson != null) {
|
|
|
log.setExecuteTime(new Date());
|
|
|
autoNodeLogService.updateAllById(log);
|
|
|
}
|
|
|
}
|
|
|
- //if(lock.tryLock()){
|
|
|
+// if(lock.tryLock()){
|
|
|
lock.unlock();
|
|
|
logger.info("节点{}名称{}释放锁",taskNode.getId(),taskNode.getNodeName());
|
|
|
- //}
|
|
|
+// }
|
|
|
logger.info("specialCallBackMyselfFlag={}",specialCallBackMyselfFlag);
|
|
|
if(specialCallBackMyselfFlag){
|
|
|
// 打标机或恒轮上传程序特殊处理
|
|
|
@@ -920,85 +887,6 @@ public class TaskWorkNode {
|
|
|
public Map checkCon(TaskNode taskNode, TTask task, Map<String, Object> dataMap) {
|
|
|
Map map = getCheckCon(taskNode, task, dataMap);
|
|
|
return map;
|
|
|
-// boolean b = (boolean) map.get("result");
|
|
|
-// if (!b) {
|
|
|
-// try {
|
|
|
-// Thread.sleep(10000);
|
|
|
-// } catch (InterruptedException e) {
|
|
|
-// e.printStackTrace();
|
|
|
-// }
|
|
|
-// return checkCon(taskNode, task, dataMap);
|
|
|
-// } else {
|
|
|
-// return map;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// logger.info("异步消息获取执行状态:");
|
|
|
-// //单线程化线程池
|
|
|
-// //初始化执行次数(默认10次) 10次后按超时处理
|
|
|
-// ExecutorService pool = Executors.newSingleThreadExecutor();
|
|
|
-// List<CompletableFuture<Map>> futures = new ArrayList<CompletableFuture<Map>>();
|
|
|
-//
|
|
|
-// List<Map> retList = new ArrayList<Map>();
|
|
|
-// int[] nums = IntStream.range(0, 9).toArray();
|
|
|
-//
|
|
|
-// CompletableFuture<Map> future = CompletableFuture.supplyAsync(() -> {
|
|
|
-// Map<String, Object> threadMap = getCheckCon(taskNode, task, dataMap);
|
|
|
-// boolean bool= (Boolean) threadMap.get("result");
|
|
|
-// return bool? threadMap: null;
|
|
|
-// }, pool).handle((result, ex) -> {
|
|
|
-// if (null != ex) {
|
|
|
-// return null;
|
|
|
-// } else {
|
|
|
-// if(null!=result){
|
|
|
-// retList.add((Map)result);
|
|
|
-// }
|
|
|
-// return result;
|
|
|
-// }
|
|
|
-// });
|
|
|
-//
|
|
|
-// if (null == future.join()){
|
|
|
-// //开启异步串行调用
|
|
|
-// for (int i : nums) {
|
|
|
-// logger.info("节点:" + taskNode.getId() + "进行第" + i + "次异步轮询条件查询");
|
|
|
-// future = future.thenApply(map -> {
|
|
|
-// Map<String, Object> threadMap = null;
|
|
|
-// try {
|
|
|
-// TimeUnit.SECONDS.sleep(5 * i);
|
|
|
-// threadMap = getCheckCon(taskNode, task, dataMap);
|
|
|
-// } catch (InterruptedException ie) {
|
|
|
-// ie.printStackTrace();
|
|
|
-// }
|
|
|
-// boolean bool= (Boolean) threadMap.get("result");
|
|
|
-// return bool? threadMap: null;
|
|
|
-// });
|
|
|
-// if (null != future.join()) {
|
|
|
-// boolean bool = (Boolean) future.join().get("result");
|
|
|
-// if (bool) {
|
|
|
-// retList.add((Map) future.join());
|
|
|
-// break;
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-// }
|
|
|
-
|
|
|
- /*
|
|
|
- CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, ex)->{
|
|
|
- if(res!=null && ex==null){
|
|
|
- retList.add((Map)res);
|
|
|
- }
|
|
|
- }); */
|
|
|
-
|
|
|
-// if (retList.size() == 0) {
|
|
|
-// logger.error("节点:" + taskNode.getId() + "条件判断超时");
|
|
|
-// //return checkCon(taskNode, task, dataMap);
|
|
|
-// //pool.shutdown();
|
|
|
-// //throw new BizException("执行异常或判断超时");
|
|
|
-// return null;
|
|
|
-// } else {
|
|
|
-// pool.shutdown();
|
|
|
-// logger.info("条件判断成功");
|
|
|
-// return retList.get(0);
|
|
|
-// }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -1009,7 +897,7 @@ public class TaskWorkNode {
|
|
|
* @return
|
|
|
*/
|
|
|
private Map getCheckCon(TaskNode taskNode, TTask task, Map<String, Object> dataMap) {
|
|
|
- logger.info("=============进入条件检测==============" + taskNode);
|
|
|
+ logger.warn("=============进入条件检测==============" + taskNode);
|
|
|
//业务类型(区分正常设备节点和线边库节点)
|
|
|
String bizType = dataMap.get("bizType") == null ? "" : dataMap.get("bizType").toString();
|
|
|
String handMode = dataMap.get("handMode") == null ? "" : dataMap.get("handMode").toString();
|
|
|
@@ -1051,7 +939,7 @@ public class TaskWorkNode {
|
|
|
nodeOperationService = MsgUtil.getBean(OtherNodeServiceImpl.class);
|
|
|
}
|
|
|
//初始化设备资源
|
|
|
- logger.info("========设备资源初始化=======" + module.getName() + "=====" + taskNode.getResourceId());
|
|
|
+ logger.warn("========设备资源初始化=======" + module.getName() + "=====" + taskNode.getResourceId());
|
|
|
nodeOperationService.initResource(taskNode, task, map);
|
|
|
|
|
|
if (bizType.equals(BizConstant.MQ_TASK_NODE_TYPE_COMMON)) {//正常节点类型
|
|
|
@@ -1059,7 +947,7 @@ public class TaskWorkNode {
|
|
|
//List<ResourceAutoCode> resourceAutoCodeList = resourceAutoCodeService.getNodeList(resourceAutoCode);
|
|
|
//业务序列节点
|
|
|
//ResourceAutoCode parentAutoCode = resourceAutoCodeService.getById(resourceAutoCode.getParentId());
|
|
|
- logger.info("=============进入自动化节点条件检测==============" + resourceAutoCode.getName());
|
|
|
+ logger.warn("=============进入自动化节点条件检测==============" + resourceAutoCode.getName());
|
|
|
|
|
|
//判断是不是在切换模式,重新排产等特殊情况下
|
|
|
map = checkPreCondition(taskNode, task, map);
|
|
|
@@ -1069,7 +957,7 @@ public class TaskWorkNode {
|
|
|
}
|
|
|
map = nodeOperationService.checkCondition(taskNode, task, map);
|
|
|
} else {
|
|
|
- logger.info("=============进入线边库条件检测==============" + resourceAutoCode.getName());
|
|
|
+ logger.warn("=============进入线边库条件检测==============" + resourceAutoCode.getName());
|
|
|
map = checkXbkCon(taskNode, task, map);
|
|
|
}
|
|
|
//System.out.println(map.get("result").toString());
|