diff --git a/web/src/main/java/com/zhehekeji/web/service/CheckLogService.java b/web/src/main/java/com/zhehekeji/web/service/CheckLogService.java index 1a33b1e..2ecdabb 100644 --- a/web/src/main/java/com/zhehekeji/web/service/CheckLogService.java +++ b/web/src/main/java/com/zhehekeji/web/service/CheckLogService.java @@ -8,6 +8,7 @@ import com.zhehekeji.web.entity.Street; import com.zhehekeji.web.mapper.CheckLogMapper; import com.zhehekeji.web.mapper.StreetMapper; import com.zhehekeji.web.pojo.stock.CheckLogSearch; +import com.zhehekeji.web.pojo.stock.StockStatus; import com.zhehekeji.web.pojo.street.StreetVO; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @@ -26,6 +27,9 @@ public class CheckLogService { @Resource private StreetMapper streetMapper; + @Resource + private StockService stockService; + public PageInfo list(CheckLogSearch search) { List street = streetMapper.list(); Map streetMap = street.stream().collect(Collectors.toMap(Street::getId, Street::getName)); @@ -47,6 +51,10 @@ public class CheckLogService { for (CheckLog checkLog :stockChecks){ checkLog.setStreetName(streetMap.get(checkLog.getStreetId())); + + Map map = stockService.getCategoryList(); + + checkLog.setWmsCategoryName(map.get(checkLog.getWmsCategory())); if (checkLog.getPic() != null && checkLog.getPic().length() > 0) { checkLog.setPics(checkLog.getPic().split(";")); } diff --git a/web/src/main/java/com/zhehekeji/web/service/InitService.java b/web/src/main/java/com/zhehekeji/web/service/InitService.java index 6268bab..95019c7 100644 --- a/web/src/main/java/com/zhehekeji/web/service/InitService.java +++ b/web/src/main/java/com/zhehekeji/web/service/InitService.java @@ -84,7 +84,7 @@ public class InitService implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { - nettyServer.CreateNettyServer(configProperties.getServerPort()); +// nettyServer.CreateNettyServer(configProperties.getServerPort()); //球机登录 List cameras = cameraMapper.selectByMap(new HashMap<>(0)); cameras.forEach(camera -> { @@ -93,6 +93,8 @@ public class InitService implements ApplicationRunner { }); //所有客户端的IP 标识保存到map内存 List streets = streetMapper.selectByMap(new HashMap<>()); + + GetPhotoDelayExecutor.runExecutor(streets); streets.forEach(street -> { if(street.getPlcIp()!=null && "".equals(street.getPlcIp())) { ClientChanel.putIp(street.getPlcIp(), street.getPlcId()); @@ -102,7 +104,7 @@ public class InitService implements ApplicationRunner { if(ksec != null){ StreetConn.init(1,"ksec"); try { - ksecNettyClient.createClient(ksec); + ksecNettyClient.connectWithRetry(ksec,0); }catch (Exception e){ ksecNettyClient.reconnect(0); log.error("kesc connect error,url:{},port:{}",ksec.getIp(),ksec.getPort()); @@ -113,7 +115,7 @@ public class InitService implements ApplicationRunner { } // TaskDelayExecutor.runMp4DownloadExecutor(); - GetPhotoDelayExecutor.runExecutor(streets); + } class LoginThread extends Thread{ diff --git a/web/src/main/java/com/zhehekeji/web/service/PlcService.java b/web/src/main/java/com/zhehekeji/web/service/PlcService.java index 45c4ef8..5fd92de 100644 --- a/web/src/main/java/com/zhehekeji/web/service/PlcService.java +++ b/web/src/main/java/com/zhehekeji/web/service/PlcService.java @@ -825,10 +825,8 @@ public class PlcService { RestTemplate restTemplate = new RestTemplate(factory); // 定义请求URL String url = "http://" + street.getPlcIp() + ":8097/hik/distinguish?streetNumber={streetNumber}&direction={direction}&category={category}"; - // 定义请求参数 // 替换为实际的category - // 发送GET请求并获取响应 try { @@ -917,7 +915,10 @@ public class PlcService { Stock stock = stockMapper.selectOne(new QueryWrapper().eq("check_Num", checkLog.getId())); checkLog.setCount(count); stock.setCount(count); - if (flag && checkLog.getWmsCount() == count && checkLog.getWmsCategory().equals(checkLog.getCategory())) { + if( !checkLog.getWmsCategory().equals(checkLog.getCategory())){ +// 品规识别错误不进行修改 + + }else if (flag && checkLog.getWmsCount() == count) { checkLog.setStatus(StockStatus.SUCCESS.getStatus()); stock.setStatus(StockStatus.SUCCESS.getStatus()); } else { diff --git a/web/src/main/java/com/zhehekeji/web/service/client/NettyServer.java b/web/src/main/java/com/zhehekeji/web/service/client/NettyServer.java index a52bb4e..7c64530 100644 --- a/web/src/main/java/com/zhehekeji/web/service/client/NettyServer.java +++ b/web/src/main/java/com/zhehekeji/web/service/client/NettyServer.java @@ -2,6 +2,8 @@ package com.zhehekeji.web.service.client; import com.zhehekeji.web.service.EmptyCheckService; import com.zhehekeji.web.service.PlcService; +import com.zhehekeji.web.service.ksec.KescEncoder; +import com.zhehekeji.web.service.ksec.KsecDecoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; diff --git a/web/src/main/java/com/zhehekeji/web/service/ksec/KsecDecoder.java b/web/src/main/java/com/zhehekeji/web/service/ksec/KsecDecoder.java index cac0598..95d7794 100644 --- a/web/src/main/java/com/zhehekeji/web/service/ksec/KsecDecoder.java +++ b/web/src/main/java/com/zhehekeji/web/service/ksec/KsecDecoder.java @@ -96,7 +96,8 @@ public class KsecDecoder extends DelimiterBasedFrameDecoder { // dataInfo.setFromDirection(1); // } plcCmdInfo = new PlcCmdInfo(dataInfo.getSRMNumber(), dataInfo.getTaskId(), dataInfo.getFromSide(), dataInfo.getFromDirection(), dataInfo.getFromColumn(), dataInfo.getFromRow(), dataInfo.getFromSeparation(),dataInfo.getToSide(), dataInfo.getToDirection(), dataInfo.getToColumn(), dataInfo.getToRow(),dataInfo.getToSeparation(),lotnum); - plcCmdInfo.setCategoryName(dataInfo.getCategoryName()); + + plcCmdInfo.setCategoryName(dataInfo.getTypeNum()); srmNumber = dataInfo.getSRMNumber(); cmdName = dataInfo.getCmdName(); } diff --git a/web/src/main/java/com/zhehekeji/web/service/ksec/KsecNettyClient.java b/web/src/main/java/com/zhehekeji/web/service/ksec/KsecNettyClient.java index c6497a9..471b200 100644 --- a/web/src/main/java/com/zhehekeji/web/service/ksec/KsecNettyClient.java +++ b/web/src/main/java/com/zhehekeji/web/service/ksec/KsecNettyClient.java @@ -5,6 +5,8 @@ import com.zhehekeji.web.config.ConfigProperties; import com.zhehekeji.web.service.PlcService; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; @@ -14,14 +16,19 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import javax.annotation.Resource; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @Component public class KsecNettyClient { - static final Logger tcpLogger = LoggerFactory.getLogger("tcp"); private static EventLoopGroup group = new NioEventLoopGroup(); @Resource @@ -32,88 +39,301 @@ public class KsecNettyClient { /** * 重连最大次数 */ - private static int RECONNECT_NUM = 10; + private static final int DEFAULT_RECONNECT_NUM = 10; private static Channel channel; + private static final ReentrantLock channelLock = new ReentrantLock(); + private static final AtomicBoolean isReconnecting = new AtomicBoolean(false); + private static final AtomicInteger reconnectCount = new AtomicInteger(0); + + // 保存当前closeFuture的引用,用于取消之前的监听器 + private static ChannelFuture closeFutureRef; + + /** + * 初始化并启动客户端连接 + */ + @PostConstruct + public void init() { + ConfigProperties.KSEC ksec = configProperties.getKsec(); + if (ksec != null && !StringUtils.isEmpty(ksec.getIp()) && ksec.getPort() != null) { + try { + connectWithRetry(ksec, 0); + } catch (Exception e) { + log.error("初始化KSEC客户端失败", e); + } + } + } + + /** + * 关闭客户端连接 + */ + @PreDestroy + public void destroy() { + closeChannel(); + if (group != null) { + group.shutdownGracefully(); + } + } + + /** + * 创建客户端连接(返回连接结果) + */ + public boolean createClient(ConfigProperties.KSEC ksec) { + channelLock.lock(); + try { + // 关闭旧连接 + closeChannel(); + + String lotnum = FileUtil.getText("lastLotnum"); + if (lotnum != null) { + KsecDecoder.setLastLotnum(lotnum); + } + if (StringUtils.isEmpty(ksec.getIp()) || ksec.getPort() == null) { + log.error("KSEC配置无效: IP={}, Port={}", ksec.getIp(), ksec.getPort()); + return false; + } + + log.info("开始连接KSEC服务器: {}:{}", ksec.getIp(), ksec.getPort()); + + Bootstrap client = new Bootstrap(); + client.group(group); + client.channel(NioSocketChannel.class); + KsecInfo heart = KsecInfo.heart(); + client.handler(new KescFilter(heart, plcService, this)); + + // 连接服务端 + ChannelFuture future = client.connect(ksec.getIp(), ksec.getPort()); + + // 等待连接完成 + future.awaitUninterruptibly(5, TimeUnit.SECONDS); + + if (future.isSuccess()) { + channel = future.channel(); + log.info("KSEC服务器连接成功: {}:{}", ksec.getIp(), ksec.getPort()); + + // 保存closeFuture引用 + closeFutureRef = channel.closeFuture(); + + // 添加连接关闭监听器,自动触发重连 + closeFutureRef.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + // 确保只有当前channel的close事件才触发重连 + channelLock.lock(); + try { + if (future.channel() == channel) { + log.warn("KSEC连接断开,准备重连..."); + scheduleReconnect(ksec); + } else { + log.debug("忽略旧channel的close事件"); + } + } finally { + channelLock.unlock(); + } + } + }); + + // 重置重连计数 + reconnectCount.set(0); + return true; + } else { + log.error("KSEC服务器连接失败: {}:{}", ksec.getIp(), ksec.getPort(), future.cause()); + return false; + } + } catch (Exception e) { + log.error("创建KSEC客户端异常", e); + return false; + } finally { + channelLock.unlock(); + } + } + + /** + * 带重试的连接(初始化时使用,连接失败会阻塞重试) + */ + public boolean connectWithRetry(ConfigProperties.KSEC ksec, int currentCount) { + Integer reconnectNum = ksec.getReconnectNum(); + // 当 reconnectNum < 0 时,表示无限重连 + // 当 reconnectNum == null 时,使用默认值 + // 当 reconnectNum >= 0 时,限制重连次数 + int maxRetry = (reconnectNum != null && reconnectNum >= 0) + ? reconnectNum + : DEFAULT_RECONNECT_NUM; + + // 检查是否达到重连上限(仅在不无限重连时检查) + boolean isInfiniteReconnect = reconnectNum != null && reconnectNum < 0; + if (!isInfiniteReconnect && currentCount >= maxRetry) { + log.error("KSEC初始化重连次数已达上限: {}, 停止重试", maxRetry); + // 初始化重连失败后,启动自动重连机制 + scheduleReconnect(ksec); + return false; + } + + boolean connected = createClient(ksec); + if (!connected) { + log.warn("KSEC连接失败,第 {} 次重试", currentCount + 1); + + // 等待后重试 + long interval = ksec.getReconnectInterval() > 0 ? ksec.getReconnectInterval() : 3000; - public void createClient(ConfigProperties.KSEC ksec) throws InterruptedException { - String lotnum = FileUtil.getText("lastLotnum"); - if(lotnum != null){ - KsecDecoder.setLastLotnum(lotnum); + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + + return connectWithRetry(ksec, currentCount + 1); } - if (StringUtils.isEmpty(ksec.getIp()) || ksec.getPort() == null) { + + return true; + } + + /** + * 定时重连(连接断开后自动触发,异步执行) + */ + private void scheduleReconnect(final ConfigProperties.KSEC ksec) { + // 检查是否达到重连上限 + Integer reconnectNum = ksec.getReconnectNum(); + boolean isInfiniteReconnect = reconnectNum != null && reconnectNum < 0; + boolean isLimitReconnect = reconnectNum != null && reconnectNum >= 0; + + if (isLimitReconnect && reconnectCount.get() >= reconnectNum) { + log.error("KSEC自动重连次数已达上限: {}, 停止重连", reconnectNum); return; } - Bootstrap client = new Bootstrap(); - client.group(group); - client.channel(NioSocketChannel.class); - KsecInfo heart = KsecInfo.heart(); - client.handler(new KescFilter(heart, plcService,this)); - // 连接服务端 - channel = client.connect(ksec.getIp(), ksec.getPort()).sync().channel(); + // 使用 CAS 确保只有一个重连任务在执行 + if (isReconnecting.compareAndSet(false, true)) { + log.info("已安排KSEC自动重连任务"); + + long delay = ksec.getReconnectInterval() > 0 ? ksec.getReconnectInterval() : 3000; + + group.schedule(new Runnable() { + @Override + public void run() { + channelLock.lock(); + try { + // 再次检查是否已有活跃连接(可能已被其他任务连接成功) + if (channel != null && channel.isActive()) { + log.info("KSEC连接已恢复,取消重连"); + isReconnecting.set(false); + return; + } + + int count = reconnectCount.incrementAndGet(); + + // 如果是限制重连次数且达到上限,则停止 + if (isLimitReconnect && count > reconnectNum) { + log.error("KSEC自动重连次数已达上限: {}, 停止重连", reconnectNum); + isReconnecting.set(false); + return; + } + + log.info("开始第 {} 次自动重连...", count); + + boolean connected = createClient(ksec); + if (connected) { + log.info("KSEC自动重连成功"); + reconnectCount.set(0); + } else { + log.warn("KSEC自动重连失败,{} 秒后继续尝试...", delay / 1000); + // 重置标记,允许下次重连 + isReconnecting.set(false); + // 立即安排下次重连 + scheduleReconnect(ksec); + } + } catch (Exception e) { + log.error("KSEC自动重连异常", e); + isReconnecting.set(false); + } finally { + channelLock.unlock(); + } + } + }, delay, TimeUnit.MILLISECONDS); + } else { + log.debug("已有重连任务在执行,跳过本次重连"); + } } /** - * 断线重连 尝试 RECONNECT_NUM 次 - * - * @param upId + * 手动触发重连 */ public void reconnect(Integer upId) { - if (channel != null) { - channel.disconnect(); - channel.close(); - } - Boolean isConnected = false; - int num = 0; ConfigProperties.KSEC ksec = configProperties.getKsec(); if (ksec == null) { - log.error("reconnect ,upPc is null ,id:{}", upId); + log.error("reconnect, ksec is null, id:{}", upId); return; } + + log.info("手动触发KSEC重连, upId:{}", upId); + isReconnecting.set(false); + scheduleReconnect(ksec); + } + + /** + * 关闭连接 + */ + private void closeChannel() { + channelLock.lock(); try { - Thread.sleep(1500); - } catch (InterruptedException e) { - e.printStackTrace(); + if (channel != null && channel.isActive()) { + // 清空closeFuture引用,避免重连被旧channel触发 + closeFutureRef = null; + + channel.disconnect(); + channel.close(); + log.info("KSEC连接已关闭"); + } + // 清空channel引用 + channel = null; + } catch (Exception e) { + log.error("关闭KSEC连接异常", e); + } finally { + channelLock.unlock(); } - while ((ksec.getReconnectNum() == -1 || num < ksec.getReconnectNum()) && !isConnected) { + } + /** + * 写入数据(自动重连) + */ + public static void write(KsecInfo ksecInfo) { + channelLock.lock(); + try { + if (channel != null && channel.isActive()) { + ChannelFuture writeFuture = channel.writeAndFlush(ksecInfo); - try { - Thread.sleep(ksec.getReconnectInterval()); - } catch (InterruptedException e) { - e.printStackTrace(); + // 监听写入结果 + writeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (future.isSuccess()) { + InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); + String clientIp = socketAddress.getAddress().getHostAddress(); + tcpLogger.info("write kesc data:{} channel:{}", ksecInfo, clientIp); + } else { + log.error("write kesc data failed, channel inactive", future.cause()); + } + } + }); + } else { + log.error("KSEC未连接,无法写入数据"); + // 这里可以触发重连逻辑 } - try { - createClient(ksec); - } catch (Exception e) { - if (channel != null) { - channel.close(); - } - //没连上 继续 - log.error("reconnect error num:{}", num); - //关闭当前链接 - num++; - continue; - } - isConnected = true; - } - if (isConnected) { - log.info("plc reconnect success"); - } else { - log.error("plc reconnect error .upPcId:{},reconnect num:{},ip:{},port:{}", upId, num, ksec.getIp(), ksec.getPort()); + } finally { + channelLock.unlock(); } } - public static void write(KsecInfo ksecInfo){ - if(channel != null){ - channel.writeAndFlush(ksecInfo); - InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); - String clientIp = socketAddress.getAddress().getHostAddress(); - tcpLogger.info("write kesc data:{} channel:{}",ksecInfo,clientIp); - }else { - log.error(" no connected upPc"); + /** + * 检查连接是否活跃 + */ + public static boolean isActive() { + channelLock.lock(); + try { + return channel != null && channel.isActive(); + } finally { + channelLock.unlock(); } - } } diff --git a/web/src/main/resources/application-prod.yml b/web/src/main/resources/application-prod.yml index c4f2aeb..79b0376 100644 --- a/web/src/main/resources/application-prod.yml +++ b/web/src/main/resources/application-prod.yml @@ -62,7 +62,7 @@ ksec: ip: 127.0.0.1 port: 8001 #断点重连的次数:-1->不断重连 -# reconnectNum: -1 + reconnectNum: -1 # #断点重连的时间间隔(单位:ms) # reconnectInterval: 10000 # 服务端IP