diff --git a/web/pom.xml b/web/pom.xml index a889ee3..3ceb44d 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -70,6 +70,12 @@ system ${project.basedir}/src/main/resources/libs/jna.jar + + + com.hierynomus + sshj + 0.35.0 + com.sun.jna.examples diff --git a/web/src/main/java/com/zhehekeji/web/config/ConfigProperties.java b/web/src/main/java/com/zhehekeji/web/config/ConfigProperties.java index c2ecca6..a626f1a 100644 --- a/web/src/main/java/com/zhehekeji/web/config/ConfigProperties.java +++ b/web/src/main/java/com/zhehekeji/web/config/ConfigProperties.java @@ -94,6 +94,7 @@ public class ConfigProperties { private Integer port; private Integer reconnectNum = 10; private long reconnectInterval = 10000; + private long heartBeatTime = 30; } @Data diff --git a/web/src/main/java/com/zhehekeji/web/service/client/ClientChanel.java b/web/src/main/java/com/zhehekeji/web/service/client/ClientChanel.java index 240d896..867d700 100644 --- a/web/src/main/java/com/zhehekeji/web/service/client/ClientChanel.java +++ b/web/src/main/java/com/zhehekeji/web/service/client/ClientChanel.java @@ -4,13 +4,13 @@ package com.zhehekeji.web.service.client; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; +import org.apache.poi.ss.formula.functions.T; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import java.time.LocalDateTime; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** @@ -19,25 +19,32 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j public class ClientChanel { - private static final Logger tcpLogger = LoggerFactory.getLogger("tcp"); + + static final Logger tcpLogger = LoggerFactory.getLogger("tcp"); + + /** + * key : 巷道标识符 + */ + static Map channelMap = new ConcurrentHashMap<>(); + /** * key : 巷道标识符 */ - private static Map channelMap = new ConcurrentHashMap<>(); + static Map channelStringTime = new ConcurrentHashMap<>(); /** * key :IP * value: 巷道标识符 */ - private static Map IP_SRMNumberMap = new ConcurrentHashMap<>(); + static Map IP_SRMNumberMap = new ConcurrentHashMap<>(); /** * key :巷道标识符 * value: IP */ - private static Map SRMNumber_IPMap = new ConcurrentHashMap<>(); + static Map SRMNumber_IPMap = new ConcurrentHashMap<>(); public static void putIp(String ip,String ID){ IP_SRMNumberMap.put(ip,ID); @@ -45,6 +52,7 @@ public class ClientChanel { public static void putSRMNUmber_Ip(String ID,String ip){ SRMNumber_IPMap.put(ID,ip); + IP_SRMNumberMap.put(ip,ID); } public static String getIpFromId(String ID){ @@ -64,9 +72,23 @@ public class ClientChanel { InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); String clientIp = socketAddress.getAddress().getHostAddress(); putSRMNUmber_Ip(SRMNumber, clientIp); + channelStringTime.put(SRMNumber,LocalDateTime.now()); log.info("connect:{}巷道 ", SRMNumber); } - +// static { +// Timer timer = new Timer(); +// timer.scheduleAtFixedRate(new TimerTask() { +// @Override +// public void run() { +// for (String key :channelStringTime.keySet()){ +// if(LocalDateTime.now().equals(channelStringTime.get(key).plusMinutes(5))) { +// channelStringTime.remove(key); +// disConnect(key); +// } +// } +// } +// },0,60000); +// } public static void disConnect(String key){ channelMap.remove(key); } diff --git a/web/src/main/java/com/zhehekeji/web/service/putian/PTDecoder.java b/web/src/main/java/com/zhehekeji/web/service/putian/PTDecoder.java index 1458451..36aa25d 100644 --- a/web/src/main/java/com/zhehekeji/web/service/putian/PTDecoder.java +++ b/web/src/main/java/com/zhehekeji/web/service/putian/PTDecoder.java @@ -5,8 +5,11 @@ import com.zhehekeji.web.service.PlcService; import com.zhehekeji.web.service.client.*; import com.zhehekeji.web.service.ksec.KsecDecoder; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LineBasedFrameDecoder; +import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.apache.poi.ss.formula.functions.T; import org.slf4j.Logger; @@ -44,6 +47,12 @@ public class PTDecoder extends LineBasedFrameDecoder { } protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + //ctx.writeAndFlush(Unpooled.copiedBuffer("1111111", CharsetUtil.UTF_8)); +// // 获取到与 ChannelHandlerContext相关联的 Channel 的引用 +// Channel channel = ctx.channel(); +// // 通过 Channel 写入缓冲区 +// channel.write(Unpooled.copiedBuffer("\n", CharsetUtil.UTF_8)); + ctx.flush(); in = (ByteBuf) super.decode(ctx, in); if(in == null){ log.info("no data"); @@ -73,7 +82,7 @@ public class PTDecoder extends LineBasedFrameDecoder { String body = in.toString(Charset.forName("UTF-8")); tcpLogger.info("PT:{}",body); PTData ptData = new PTData(body); - + log.info(ptData.getType().toString()); if(ptData.getType().equals(PTData.HEART_TYPE)){ //心跳 什么都不处理,因为netty长链接,断开会感知 diff --git a/web/src/main/java/com/zhehekeji/web/service/putian/PTFilter.java b/web/src/main/java/com/zhehekeji/web/service/putian/PTFilter.java index 5307820..356b60e 100644 --- a/web/src/main/java/com/zhehekeji/web/service/putian/PTFilter.java +++ b/web/src/main/java/com/zhehekeji/web/service/putian/PTFilter.java @@ -1,5 +1,6 @@ package com.zhehekeji.web.service.putian; +import com.zhehekeji.web.config.ConfigProperties; import com.zhehekeji.web.service.EmptyCheckService; import com.zhehekeji.web.service.PlcService; import com.zhehekeji.web.service.ksec.*; @@ -26,17 +27,20 @@ public class PTFilter extends ChannelInitializer { private PuTianNettyClient nettyClient; - public PTFilter(PlcService plcService, PuTianNettyClient nettyClient,EmptyCheckService emptyCheckService){ + private ConfigProperties configProperties; + + public PTFilter(PlcService plcService, PuTianNettyClient nettyClient,EmptyCheckService emptyCheckService,ConfigProperties configProperties){ this.plcService = plcService; this.nettyClient = nettyClient; this.emptyCheckService = emptyCheckService; + this.configProperties = configProperties; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline ph = ch.pipeline(); //30秒发一次心跳 - ph.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)); + ph.addLast(new IdleStateHandler(0, configProperties.getKsec().getHeartBeatTime() , 0, TimeUnit.SECONDS)); ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes()); ph.addLast(new PTDecoder(1000,true,true,emptyCheckService,plcService)); ph.addLast(new PtEncoder()); diff --git a/web/src/main/java/com/zhehekeji/web/service/putian/PuTianNettyClient.java b/web/src/main/java/com/zhehekeji/web/service/putian/PuTianNettyClient.java index bf9ff5c..202f683 100644 --- a/web/src/main/java/com/zhehekeji/web/service/putian/PuTianNettyClient.java +++ b/web/src/main/java/com/zhehekeji/web/service/putian/PuTianNettyClient.java @@ -19,12 +19,15 @@ import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; +import java.util.Timer; +import java.util.TimerTask; @Slf4j @Component public class PuTianNettyClient { - private static EventLoopGroup group = new NioEventLoopGroup(); + //单线程拒绝多线程导致心跳频繁 + private static EventLoopGroup group = new NioEventLoopGroup(1); @Resource private PlcService plcService; @Resource @@ -43,7 +46,7 @@ public class PuTianNettyClient { client.group(group); client.channel(NioSocketChannel.class); - client.handler(new PTFilter(plcService,this,emptyCheckService)); + client.handler(new PTFilter(plcService,this,emptyCheckService,configProperties)); // 连接服务端 channel = client.connect(ksec.getIp(), ksec.getPort()).sync().channel(); @@ -55,47 +58,31 @@ public class PuTianNettyClient { * @param upId */ public void reconnect(Integer upId) { - Boolean isConnected = false; - int num = 0; + + Timer timer = new Timer(); + ConfigProperties.KSEC ksec = configProperties.getKsec(); - if (ksec == null) { - log.error("reconnect ,upPc is null ,id:{}", upId); - return; - } - try { - Thread.sleep(1500); - } catch (InterruptedException e) { - e.printStackTrace(); - } - while (ksec.getReconnectNum() == -1 || num < ksec.getReconnectNum() && !isConnected) { - //while (num < RECONNECT_NUM && !isConnected) { - try { - Thread.sleep(ksec.getReconnectInterval()); - } catch (InterruptedException e) { - e.printStackTrace(); - } - try { - createClient(ksec); - } catch (Exception e) { -// channel.close(); - //没连上 继续 - log.error("reconnect error num:{}", num); - //关闭当前链接 - num++; -// try{ -// Thread.sleep(ksec.getReconnectInterval()); -// }catch (Exception ex){ -// throw new RuntimeException(ex); -// } - continue; + TimerTask timerTask = new TimerTask() { + int num = 0; + @Override + public void run() { + if (num >=ksec.getReconnectNum() && ksec.getReconnectNum()>=0){ + timer.cancel(); + } + + try { + createClient(ksec); + log.info("plc reconnect success"); + timer.cancel(); + } catch (Exception e) { + //没连上 继续 + log.error("reconnect error num:{}", num); + //关闭当前链接 + num++; + } } - isConnected = true; - } - if (isConnected) { - log.info("plc reconnect success"); - } else { - log.error("plc reconnect error .upPcId:{},reconnect num:{},ip:{},port:{}", upId, num, ksec.getIp(), ksec.getPort()); - } + }; + timer.scheduleAtFixedRate(timerTask,1000,ksec.getReconnectInterval()); } public static void write(PTData ptData){ diff --git a/web/src/main/resources/application-test.yml b/web/src/main/resources/application-test.yml index c53c4e8..46076f3 100644 --- a/web/src/main/resources/application-test.yml +++ b/web/src/main/resources/application-test.yml @@ -14,7 +14,7 @@ spring: testWhileIdle: false timeBetweenEvictionRunsMillis: 60000 type: com.alibaba.druid.pool.DruidDataSource - url: jdbc:mysql://127.0.0.1:3306/lia_duoji?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8 + url: jdbc:mysql://127.0.0.1:3306/lia_duo_nanjingyancao?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8 username: root validationQuery: SELECT 1 FROM DUAL # --------本服务端口号 @@ -52,6 +52,11 @@ serverMode: 1 ksec: ip: 127.0.0.1 port: 3000 + #断点重连的次数:-1->不断重连 + reconnectNum: -1 + #断点重连的时间间隔(单位:ms) + reconnectInterval: 1000 + heartBeatTime: 30 # ------------ 实时视频流 全部页面的格式 行列数量 videoStyleConfig: videoStyleRow: 2 diff --git a/web/src/main/resources/mapper/OrderMapper.xml b/web/src/main/resources/mapper/OrderMapper.xml index bd556c7..d1c2aa2 100644 --- a/web/src/main/resources/mapper/OrderMapper.xml +++ b/web/src/main/resources/mapper/OrderMapper.xml @@ -10,7 +10,7 @@ and t.order_num = #{req.orderNum} - and t.start_time >= #{req.startTimestamp} and t.start_time <= #{req.endTimestamp} + and t.into_stock_time >= #{req.startTimestamp} and t.into_stock_time <= #{req.endTimestamp} order by t.id desc