Просмотр исходного кода

按设备维度隔离执行状态,支持多设备并发

BSWYZ 1 неделя назад
Родитель
Сommit
4f43bad637

+ 61 - 32
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/mq/TaskWorkNode.java

@@ -77,8 +77,6 @@ import org.springframework.web.client.AsyncRestTemplate;
 import org.springframework.web.client.RestClientException;
 import org.springframework.web.client.RestTemplate;
 import org.springframework.web.reactive.function.client.WebClient;
-import reactor.core.publisher.Mono;
-
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
@@ -128,7 +126,6 @@ public class TaskWorkNode {
 
     @Autowired
     private ModuleInstructionService moduleInstructionService;
-    private NodeOperationService nodeOperationService;
 
     @Autowired
     private ResourceBusinessService resourceBusinessService;
@@ -185,15 +182,51 @@ public class TaskWorkNode {
 
     private static Logger logger = LoggerFactory.getLogger(TaskWorkNode.class);
 
-    private boolean isFinal = false;
-
     private final String fileName = "";
 
-    private static ReentrantLock lock = new ReentrantLock();
+    /**
+     * 按队列/设备维度分段锁,避免所有 MQ 消息共用一把全局锁。
+     */
+    private final ConcurrentMap<String, ReentrantLock> executionLocks = new ConcurrentHashMap<>();
 
-    private boolean lockCondition = true;
+    private ReentrantLock getExecutionLock(String lockKey) {
+        return executionLocks.computeIfAbsent(lockKey, key -> new ReentrantLock());
+    }
+
+    private String buildExecutionLockKey(String consumerQueue, TaskNode taskNode) {
+        if (StringUtils.isNotBlank(consumerQueue)) {
+            return "TASK_QUEUE_" + consumerQueue.toUpperCase().replace(".", "_");
+        }
+        if (taskNode == null) {
+            return "TASK_NODE_UNKNOWN";
+        }
+        if (taskNode.getResourceId() == null) {
+            return "TASK_NODE_UNKNOWN_" + taskNode.getId();
+        }
+        return "TASK_NODE_" + taskNode.getResourceId();
+    }
 
-    private String feedBack;
+    private void releaseExecutionLock(String lockKey, ReentrantLock executionLock) {
+        if (executionLock != null && executionLock.isHeldByCurrentThread()) {
+            executionLock.unlock();
+            if (StringUtils.isNotBlank(lockKey) && !executionLock.isLocked() && !executionLock.hasQueuedThreads()) {
+                executionLocks.remove(lockKey, executionLock);
+            }
+        }
+    }
+
+    private NodeOperationService selectNodeOperationService(String category) {
+        if ("1".equals(category)) {
+            return MsgUtil.getBean(MachineNodeServiceImpl.class);
+        } else if ("2".equals(category)) {
+            return MsgUtil.getBean(AGVNodeServiceImpl.class);
+        } else if ("3".equals(category)) {
+            return MsgUtil.getBean(RobotNodeServiceImpl.class);
+        } else if ("4".equals(category)) {
+            return MsgUtil.getBean(OtherNodeServiceImpl.class);
+        }
+        throw new BizException("不支持的节点类型:" + category);
+    }
 
     public void updateTaskStatusJob(String data, String consumerQueue, int priority) throws InterruptedException {
 
@@ -201,9 +234,12 @@ public class TaskWorkNode {
         boolean specialCallBackMyselfFlag = false;
         Map conMap = new HashMap();
         boolean lockFlag = false;
+        ReentrantLock executionLock = null;
+        String executionLockKey = null;
         TaskNode taskNode = null;
         TTask tTask = null;
         AAutoNodeLog log = null;
+        String feedBackSnapshot = null;
 
         BaseContextHandler.setTenant("0000");
         JSONObject jsonObject = JSONObject.parseObject(data);
@@ -292,10 +328,11 @@ public class TaskWorkNode {
         tTask = taskMapper.selectById(taskNode.getTaskId());
         //初始化业务日志
         log = getBusinessLog(tTask, taskNode);
-        feedBack = log.getFeedback();
+        feedBackSnapshot = log == null ? null : log.getFeedback();
         try {
-            lockFlag = lock.tryLock();
-            lockCondition = checkFlowCondition(tTask);
+            executionLockKey = buildExecutionLockKey(consumerQueue, taskNode);
+            executionLock = getExecutionLock(executionLockKey);
+            lockFlag = executionLock.tryLock();
             if(lockFlag){
                 logger.info("节点{}获取到了执行锁",taskNode.getId());
                 TaskNode hisTask = taskNodeMapper.selectById(taskNodeId);
@@ -359,7 +396,6 @@ public class TaskWorkNode {
                 ResourceAutoCode resourceAutoCode = resourceAutoCodeService.getById(taskNode.getAutoNode().getId());
                 taskNode.setAutoNode(resourceAutoCode);
 
-                isFinal = resourceAutoCode.isFinal() ? true : false;
                 msgUtil.redis_set(CacheKey.TASK_CURRENT_NODE + "_" + taskNode.getResourceId(), resourceAutoCode, 3, TimeUnit.DAYS);
 
                 //更新节点任务
@@ -617,7 +653,7 @@ public class TaskWorkNode {
             e.printStackTrace();
         } finally {
             logger.info("returnData={}",returnData);
-            JSONObject retJson = JSONObject.parseObject(returnData);
+            JSONObject retJson = StringUtils.isNotBlank(returnData) ? JSONObject.parseObject(returnData) : null;
             if (retJson != null) {
                 String code = retJson.getString("result").trim();
                 String concurrency = retJson.containsKey("concurrency")? retJson.getString("concurrency").trim() : "false";
@@ -678,15 +714,15 @@ public class TaskWorkNode {
             if (log.getId() == null) {
                 autoNodeLogService.save(log);
             } else {
-                if(StringUtil.isNotEmpty(feedBack) && StringUtil.isNotEmpty(log.getFeedback()) && !log.getFeedback().equals(feedBack)) {
+                if(StringUtil.isNotEmpty(feedBackSnapshot) && StringUtil.isNotEmpty(log.getFeedback()) && !log.getFeedback().equals(feedBackSnapshot)) {
                     log.setExecuteTime(new Date());
                     autoNodeLogService.updateAllById(log);
                 }
             }
-            //if(lock.tryLock()){
-                lock.unlock();
+            if(lockFlag){
+                releaseExecutionLock(executionLockKey, executionLock);
                 logger.info("节点{}名称{}释放锁",taskNode.getId(),taskNode.getNodeName());
-            //}
+            }
             logger.info("specialCallBackMyselfFlag={}",specialCallBackMyselfFlag);
             if(specialCallBackMyselfFlag){
                 // 打标机或恒轮上传程序特殊处理
@@ -1038,18 +1074,7 @@ public class TaskWorkNode {
         map.put("zone", dataMap.get("zone"));
         map.put("bomzZone", dataMap.get("bomzZone"));
 
-        if ("1".equals(category)) {
-            nodeOperationService = MsgUtil.getBean(MachineNodeServiceImpl.class);
-        } else if ("2".equals(category)) {
-            nodeOperationService = MsgUtil.getBean(AGVNodeServiceImpl.class);
-        } else if ("3".equals(category)) {
-            //默认示范线设备
-            //map.put("robotType", DictionaryKey.LINE_ROBOT_NODES.get(robotType));
-//            map.put("robotType", DictionaryKey.YJ_ROBOT_NODES.get(robotType));
-            nodeOperationService = MsgUtil.getBean(RobotNodeServiceImpl.class);
-        } else if ("4".equals(category)) {
-            nodeOperationService = MsgUtil.getBean(OtherNodeServiceImpl.class);
-        }
+        NodeOperationService nodeOperationService = selectNodeOperationService(category);
         //初始化设备资源
         logger.info("========设备资源初始化=======" + module.getName() + "=====" + taskNode.getResourceId());
         nodeOperationService.initResource(taskNode, task, map);
@@ -2006,9 +2031,13 @@ public class TaskWorkNode {
         WebClient webClient = WebClient.create(hostSystemUrl);
         jsonObject.put("taskId", "1");
         jsonObject.put("taskNodeId", "1");
-        Mono mono = webClient.post().uri("/api/UploadFile").contentType(MediaType.APPLICATION_JSON)
-                .syncBody(jsonObject).retrieve().bodyToMono(String.class);
-        System.out.println(mono.block());
+        webClient.post().uri("/api/UploadFile").contentType(MediaType.APPLICATION_JSON)
+                .bodyValue(jsonObject)
+                .retrieve()
+                .bodyToMono(String.class)
+                .doOnSuccess(response -> logger.info("异步上传完成: {}", response))
+                .doOnError(error -> logger.error("异步上传失败", error))
+                .subscribe();
         return true;
     }
 }

+ 2 - 0
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/productionResourceCenter/service/impl/AGVNodeServiceImpl.java

@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.utils.Lists;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import java.util.HashMap;
@@ -41,6 +42,7 @@ import com.github.zuihou.business.util.MsgUtil;
 
 @Slf4j
 @Service
+@Scope("prototype")
 public class AGVNodeServiceImpl implements NodeOperationService {
 
     @Autowired

+ 2 - 0
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/productionResourceCenter/service/impl/MachineNodeServiceImpl.java

@@ -46,6 +46,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.utils.Lists;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import java.util.*;
@@ -57,6 +58,7 @@ import static java.util.stream.Collectors.groupingBy;
 
 @Slf4j
 @Service
+@Scope("prototype")
 public class MachineNodeServiceImpl implements NodeOperationService {
 
     @Autowired

+ 2 - 0
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/productionResourceCenter/service/impl/OtherNodeServiceImpl.java

@@ -30,6 +30,7 @@ import com.google.common.collect.Maps;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.utils.Lists;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
@@ -41,6 +42,7 @@ import java.util.stream.Collectors;
 
 @Slf4j
 @Service
+@Scope("prototype")
 public class OtherNodeServiceImpl implements NodeOperationService {
 
     private Map<String, Object> map = Maps.newHashMap();

+ 2 - 0
imcs-admin-boot/imcs-business-biz/src/main/java/com/github/zuihou/business/productionResourceCenter/service/impl/RobotNodeServiceImpl.java

@@ -62,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Scope;
 import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
@@ -82,6 +83,7 @@ import static java.util.stream.Collectors.toList;
 
 @Slf4j
 @Service
+@Scope("prototype")
 public class RobotNodeServiceImpl implements NodeOperationService {
 
     @Autowired