代码修改

just-order-普洱
LAPTOP-S9HJSOEB\昊天 4 weeks ago
parent d6e7f25a1f
commit 4edbc309e1

@ -8,6 +8,7 @@ import com.zhehekeji.web.entity.Street;
import com.zhehekeji.web.mapper.CheckLogMapper; import com.zhehekeji.web.mapper.CheckLogMapper;
import com.zhehekeji.web.mapper.StreetMapper; import com.zhehekeji.web.mapper.StreetMapper;
import com.zhehekeji.web.pojo.stock.CheckLogSearch; import com.zhehekeji.web.pojo.stock.CheckLogSearch;
import com.zhehekeji.web.pojo.stock.StockStatus;
import com.zhehekeji.web.pojo.street.StreetVO; import com.zhehekeji.web.pojo.street.StreetVO;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -26,6 +27,9 @@ public class CheckLogService {
@Resource @Resource
private StreetMapper streetMapper; private StreetMapper streetMapper;
@Resource
private StockService stockService;
public PageInfo<CheckLog> list(CheckLogSearch search) { public PageInfo<CheckLog> list(CheckLogSearch search) {
List<StreetVO> street = streetMapper.list(); List<StreetVO> street = streetMapper.list();
Map<Integer, String> streetMap = street.stream().collect(Collectors.toMap(Street::getId, Street::getName)); Map<Integer, String> streetMap = street.stream().collect(Collectors.toMap(Street::getId, Street::getName));
@ -47,6 +51,10 @@ public class CheckLogService {
for (CheckLog checkLog :stockChecks){ for (CheckLog checkLog :stockChecks){
checkLog.setStreetName(streetMap.get(checkLog.getStreetId())); checkLog.setStreetName(streetMap.get(checkLog.getStreetId()));
Map<String, String> map = stockService.getCategoryList();
checkLog.setWmsCategoryName(map.get(checkLog.getWmsCategory()));
if (checkLog.getPic() != null && checkLog.getPic().length() > 0) { if (checkLog.getPic() != null && checkLog.getPic().length() > 0) {
checkLog.setPics(checkLog.getPic().split(";")); checkLog.setPics(checkLog.getPic().split(";"));
} }

@ -84,7 +84,7 @@ public class InitService implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
nettyServer.CreateNettyServer(configProperties.getServerPort()); // nettyServer.CreateNettyServer(configProperties.getServerPort());
//球机登录 //球机登录
List<Camera> cameras = cameraMapper.selectByMap(new HashMap<>(0)); List<Camera> cameras = cameraMapper.selectByMap(new HashMap<>(0));
cameras.forEach(camera -> { cameras.forEach(camera -> {
@ -93,6 +93,8 @@ public class InitService implements ApplicationRunner {
}); });
//所有客户端的IP 标识保存到map内存 //所有客户端的IP 标识保存到map内存
List<Street> streets = streetMapper.selectByMap(new HashMap<>()); List<Street> streets = streetMapper.selectByMap(new HashMap<>());
GetPhotoDelayExecutor.runExecutor(streets);
streets.forEach(street -> { streets.forEach(street -> {
if(street.getPlcIp()!=null && "".equals(street.getPlcIp())) { if(street.getPlcIp()!=null && "".equals(street.getPlcIp())) {
ClientChanel.putIp(street.getPlcIp(), street.getPlcId()); ClientChanel.putIp(street.getPlcIp(), street.getPlcId());
@ -102,7 +104,7 @@ public class InitService implements ApplicationRunner {
if(ksec != null){ if(ksec != null){
StreetConn.init(1,"ksec"); StreetConn.init(1,"ksec");
try { try {
ksecNettyClient.createClient(ksec); ksecNettyClient.connectWithRetry(ksec,0);
}catch (Exception e){ }catch (Exception e){
ksecNettyClient.reconnect(0); ksecNettyClient.reconnect(0);
log.error("kesc connect error,url:{},port:{}",ksec.getIp(),ksec.getPort()); log.error("kesc connect error,url:{},port:{}",ksec.getIp(),ksec.getPort());
@ -113,7 +115,7 @@ public class InitService implements ApplicationRunner {
} }
// TaskDelayExecutor.runMp4DownloadExecutor(); // TaskDelayExecutor.runMp4DownloadExecutor();
GetPhotoDelayExecutor.runExecutor(streets);
} }
class LoginThread extends Thread{ class LoginThread extends Thread{

@ -825,10 +825,8 @@ public class PlcService {
RestTemplate restTemplate = new RestTemplate(factory); RestTemplate restTemplate = new RestTemplate(factory);
// 定义请求URL // 定义请求URL
String url = "http://" + street.getPlcIp() + ":8097/hik/distinguish?streetNumber={streetNumber}&direction={direction}&category={category}"; String url = "http://" + street.getPlcIp() + ":8097/hik/distinguish?streetNumber={streetNumber}&direction={direction}&category={category}";
// 定义请求参数 // 定义请求参数
// 替换为实际的category // 替换为实际的category
// 发送GET请求并获取响应 // 发送GET请求并获取响应
try { try {
@ -917,7 +915,10 @@ public class PlcService {
Stock stock = stockMapper.selectOne(new QueryWrapper<Stock>().eq("check_Num", checkLog.getId())); Stock stock = stockMapper.selectOne(new QueryWrapper<Stock>().eq("check_Num", checkLog.getId()));
checkLog.setCount(count); checkLog.setCount(count);
stock.setCount(count); stock.setCount(count);
if (flag && checkLog.getWmsCount() == count && checkLog.getWmsCategory().equals(checkLog.getCategory())) { if( !checkLog.getWmsCategory().equals(checkLog.getCategory())){
// 品规识别错误不进行修改
}else if (flag && checkLog.getWmsCount() == count) {
checkLog.setStatus(StockStatus.SUCCESS.getStatus()); checkLog.setStatus(StockStatus.SUCCESS.getStatus());
stock.setStatus(StockStatus.SUCCESS.getStatus()); stock.setStatus(StockStatus.SUCCESS.getStatus());
} else { } else {

@ -2,6 +2,8 @@ package com.zhehekeji.web.service.client;
import com.zhehekeji.web.service.EmptyCheckService; import com.zhehekeji.web.service.EmptyCheckService;
import com.zhehekeji.web.service.PlcService; import com.zhehekeji.web.service.PlcService;
import com.zhehekeji.web.service.ksec.KescEncoder;
import com.zhehekeji.web.service.ksec.KsecDecoder;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;

@ -96,7 +96,8 @@ public class KsecDecoder extends DelimiterBasedFrameDecoder {
// dataInfo.setFromDirection(1); // dataInfo.setFromDirection(1);
// } // }
plcCmdInfo = new PlcCmdInfo(dataInfo.getSRMNumber(), dataInfo.getTaskId(), dataInfo.getFromSide(), dataInfo.getFromDirection(), dataInfo.getFromColumn(), dataInfo.getFromRow(), dataInfo.getFromSeparation(),dataInfo.getToSide(), dataInfo.getToDirection(), dataInfo.getToColumn(), dataInfo.getToRow(),dataInfo.getToSeparation(),lotnum); plcCmdInfo = new PlcCmdInfo(dataInfo.getSRMNumber(), dataInfo.getTaskId(), dataInfo.getFromSide(), dataInfo.getFromDirection(), dataInfo.getFromColumn(), dataInfo.getFromRow(), dataInfo.getFromSeparation(),dataInfo.getToSide(), dataInfo.getToDirection(), dataInfo.getToColumn(), dataInfo.getToRow(),dataInfo.getToSeparation(),lotnum);
plcCmdInfo.setCategoryName(dataInfo.getCategoryName());
plcCmdInfo.setCategoryName(dataInfo.getTypeNum());
srmNumber = dataInfo.getSRMNumber(); srmNumber = dataInfo.getSRMNumber();
cmdName = dataInfo.getCmdName(); cmdName = dataInfo.getCmdName();
} }

@ -5,6 +5,8 @@ import com.zhehekeji.web.config.ConfigProperties;
import com.zhehekeji.web.service.PlcService; import com.zhehekeji.web.service.PlcService;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
@ -14,14 +16,19 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Slf4j
@Component @Component
public class KsecNettyClient { public class KsecNettyClient {
static final Logger tcpLogger = LoggerFactory.getLogger("tcp"); static final Logger tcpLogger = LoggerFactory.getLogger("tcp");
private static EventLoopGroup group = new NioEventLoopGroup(); private static EventLoopGroup group = new NioEventLoopGroup();
@Resource @Resource
@ -32,88 +39,301 @@ public class KsecNettyClient {
/** /**
* *
*/ */
private static int RECONNECT_NUM = 10; private static final int DEFAULT_RECONNECT_NUM = 10;
private static Channel channel; private static Channel channel;
private static final ReentrantLock channelLock = new ReentrantLock();
private static final AtomicBoolean isReconnecting = new AtomicBoolean(false);
private static final AtomicInteger reconnectCount = new AtomicInteger(0);
// 保存当前closeFuture的引用用于取消之前的监听器
private static ChannelFuture closeFutureRef;
/**
*
*/
@PostConstruct
public void init() {
ConfigProperties.KSEC ksec = configProperties.getKsec();
if (ksec != null && !StringUtils.isEmpty(ksec.getIp()) && ksec.getPort() != null) {
try {
connectWithRetry(ksec, 0);
} catch (Exception e) {
log.error("初始化KSEC客户端失败", e);
}
}
}
/**
*
*/
@PreDestroy
public void destroy() {
closeChannel();
if (group != null) {
group.shutdownGracefully();
}
}
/**
*
*/
public boolean createClient(ConfigProperties.KSEC ksec) {
channelLock.lock();
try {
// 关闭旧连接
closeChannel();
public void createClient(ConfigProperties.KSEC ksec) throws InterruptedException {
String lotnum = FileUtil.getText("lastLotnum"); String lotnum = FileUtil.getText("lastLotnum");
if(lotnum != null){ if (lotnum != null) {
KsecDecoder.setLastLotnum(lotnum); KsecDecoder.setLastLotnum(lotnum);
} }
if (StringUtils.isEmpty(ksec.getIp()) || ksec.getPort() == null) { if (StringUtils.isEmpty(ksec.getIp()) || ksec.getPort() == null) {
return; log.error("KSEC配置无效: IP={}, Port={}", ksec.getIp(), ksec.getPort());
return false;
} }
log.info("开始连接KSEC服务器: {}:{}", ksec.getIp(), ksec.getPort());
Bootstrap client = new Bootstrap(); Bootstrap client = new Bootstrap();
client.group(group); client.group(group);
client.channel(NioSocketChannel.class); client.channel(NioSocketChannel.class);
KsecInfo heart = KsecInfo.heart(); KsecInfo heart = KsecInfo.heart();
client.handler(new KescFilter(heart, plcService,this)); client.handler(new KescFilter(heart, plcService, this));
// 连接服务端 // 连接服务端
channel = client.connect(ksec.getIp(), ksec.getPort()).sync().channel(); ChannelFuture future = client.connect(ksec.getIp(), ksec.getPort());
// 等待连接完成
future.awaitUninterruptibly(5, TimeUnit.SECONDS);
if (future.isSuccess()) {
channel = future.channel();
log.info("KSEC服务器连接成功: {}:{}", ksec.getIp(), ksec.getPort());
// 保存closeFuture引用
closeFutureRef = channel.closeFuture();
// 添加连接关闭监听器,自动触发重连
closeFutureRef.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
// 确保只有当前channel的close事件才触发重连
channelLock.lock();
try {
if (future.channel() == channel) {
log.warn("KSEC连接断开准备重连...");
scheduleReconnect(ksec);
} else {
log.debug("忽略旧channel的close事件");
}
} finally {
channelLock.unlock();
}
}
});
// 重置重连计数
reconnectCount.set(0);
return true;
} else {
log.error("KSEC服务器连接失败: {}:{}", ksec.getIp(), ksec.getPort(), future.cause());
return false;
}
} catch (Exception e) {
log.error("创建KSEC客户端异常", e);
return false;
} finally {
channelLock.unlock();
}
} }
/** /**
* 线 RECONNECT_NUM * 使
*
* @param upId
*/ */
public void reconnect(Integer upId) { public boolean connectWithRetry(ConfigProperties.KSEC ksec, int currentCount) {
if (channel != null) { Integer reconnectNum = ksec.getReconnectNum();
channel.disconnect(); // 当 reconnectNum < 0 时,表示无限重连
channel.close(); // 当 reconnectNum == null 时,使用默认值
} // 当 reconnectNum >= 0 时,限制重连次数
Boolean isConnected = false; int maxRetry = (reconnectNum != null && reconnectNum >= 0)
int num = 0; ? reconnectNum
ConfigProperties.KSEC ksec = configProperties.getKsec(); : DEFAULT_RECONNECT_NUM;
if (ksec == null) {
log.error("reconnect ,upPc is null ,id:{}", upId); // 检查是否达到重连上限(仅在不无限重连时检查)
return; boolean isInfiniteReconnect = reconnectNum != null && reconnectNum < 0;
if (!isInfiniteReconnect && currentCount >= maxRetry) {
log.error("KSEC初始化重连次数已达上限: {}, 停止重试", maxRetry);
// 初始化重连失败后,启动自动重连机制
scheduleReconnect(ksec);
return false;
} }
boolean connected = createClient(ksec);
if (!connected) {
log.warn("KSEC连接失败第 {} 次重试", currentCount + 1);
// 等待后重试
long interval = ksec.getReconnectInterval() > 0 ? ksec.getReconnectInterval() : 3000;
try { try {
Thread.sleep(1500); Thread.sleep(interval);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); Thread.currentThread().interrupt();
return false;
} }
while ((ksec.getReconnectNum() == -1 || num < ksec.getReconnectNum()) && !isConnected) {
return connectWithRetry(ksec, currentCount + 1);
}
try { return true;
Thread.sleep(ksec.getReconnectInterval());
} catch (InterruptedException e) {
e.printStackTrace();
} }
/**
*
*/
private void scheduleReconnect(final ConfigProperties.KSEC ksec) {
// 检查是否达到重连上限
Integer reconnectNum = ksec.getReconnectNum();
boolean isInfiniteReconnect = reconnectNum != null && reconnectNum < 0;
boolean isLimitReconnect = reconnectNum != null && reconnectNum >= 0;
if (isLimitReconnect && reconnectCount.get() >= reconnectNum) {
log.error("KSEC自动重连次数已达上限: {}, 停止重连", reconnectNum);
return;
}
// 使用 CAS 确保只有一个重连任务在执行
if (isReconnecting.compareAndSet(false, true)) {
log.info("已安排KSEC自动重连任务");
long delay = ksec.getReconnectInterval() > 0 ? ksec.getReconnectInterval() : 3000;
group.schedule(new Runnable() {
@Override
public void run() {
channelLock.lock();
try { try {
createClient(ksec); // 再次检查是否已有活跃连接(可能已被其他任务连接成功)
} catch (Exception e) { if (channel != null && channel.isActive()) {
if (channel != null) { log.info("KSEC连接已恢复取消重连");
channel.close(); isReconnecting.set(false);
return;
} }
//没连上 继续
log.error("reconnect error num:{}", num); int count = reconnectCount.incrementAndGet();
//关闭当前链接
num++; // 如果是限制重连次数且达到上限,则停止
continue; if (isLimitReconnect && count > reconnectNum) {
log.error("KSEC自动重连次数已达上限: {}, 停止重连", reconnectNum);
isReconnecting.set(false);
return;
}
log.info("开始第 {} 次自动重连...", count);
boolean connected = createClient(ksec);
if (connected) {
log.info("KSEC自动重连成功");
reconnectCount.set(0);
} else {
log.warn("KSEC自动重连失败{} 秒后继续尝试...", delay / 1000);
// 重置标记,允许下次重连
isReconnecting.set(false);
// 立即安排下次重连
scheduleReconnect(ksec);
}
} catch (Exception e) {
log.error("KSEC自动重连异常", e);
isReconnecting.set(false);
} finally {
channelLock.unlock();
} }
isConnected = true;
} }
if (isConnected) { }, delay, TimeUnit.MILLISECONDS);
log.info("plc reconnect success");
} else { } else {
log.error("plc reconnect error .upPcId:{},reconnect num:{},ip:{},port:{}", upId, num, ksec.getIp(), ksec.getPort()); log.debug("已有重连任务在执行,跳过本次重连");
} }
} }
public static void write(KsecInfo ksecInfo){ /**
if(channel != null){ *
channel.writeAndFlush(ksecInfo); */
public void reconnect(Integer upId) {
ConfigProperties.KSEC ksec = configProperties.getKsec();
if (ksec == null) {
log.error("reconnect, ksec is null, id:{}", upId);
return;
}
log.info("手动触发KSEC重连, upId:{}", upId);
isReconnecting.set(false);
scheduleReconnect(ksec);
}
/**
*
*/
private void closeChannel() {
channelLock.lock();
try {
if (channel != null && channel.isActive()) {
// 清空closeFuture引用避免重连被旧channel触发
closeFutureRef = null;
channel.disconnect();
channel.close();
log.info("KSEC连接已关闭");
}
// 清空channel引用
channel = null;
} catch (Exception e) {
log.error("关闭KSEC连接异常", e);
} finally {
channelLock.unlock();
}
}
/**
*
*/
public static void write(KsecInfo ksecInfo) {
channelLock.lock();
try {
if (channel != null && channel.isActive()) {
ChannelFuture writeFuture = channel.writeAndFlush(ksecInfo);
// 监听写入结果
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
String clientIp = socketAddress.getAddress().getHostAddress(); String clientIp = socketAddress.getAddress().getHostAddress();
tcpLogger.info("write kesc data:{} channel:{}",ksecInfo,clientIp); tcpLogger.info("write kesc data:{} channel:{}", ksecInfo, clientIp);
}else { } else {
log.error(" no connected upPc"); log.error("write kesc data failed, channel inactive", future.cause());
}
}
});
} else {
log.error("KSEC未连接无法写入数据");
// 这里可以触发重连逻辑
}
} finally {
channelLock.unlock();
}
} }
/**
*
*/
public static boolean isActive() {
channelLock.lock();
try {
return channel != null && channel.isActive();
} finally {
channelLock.unlock();
}
} }
} }

@ -62,7 +62,7 @@ ksec:
ip: 127.0.0.1 ip: 127.0.0.1
port: 8001 port: 8001
#断点重连的次数:-1->不断重连 #断点重连的次数:-1->不断重连
# reconnectNum: -1 reconnectNum: -1
# #断点重连的时间间隔(单位ms) # #断点重连的时间间隔(单位ms)
# reconnectInterval: 10000 # reconnectInterval: 10000
# 服务端IP # 服务端IP

Loading…
Cancel
Save