服务器连接固定一个线程,避免多个线程影响性能和多次心跳

开放心跳参数
 修改服务器连接状态监测
 随行开始时间参数修改
nanjing-yancao-wuliuzhongxin-qsl
LAPTOP-S9HJSOEB\昊天 3 years ago
commit 90a8d47e2d

@ -70,6 +70,12 @@
<scope>system</scope> <scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/libs/jna.jar</systemPath> <systemPath>${project.basedir}/src/main/resources/libs/jna.jar</systemPath>
</dependency> </dependency>
<!--sshj-->
<dependency>
<groupId>com.hierynomus</groupId>
<artifactId>sshj</artifactId>
<version>0.35.0</version>
</dependency>
<dependency> <dependency>
<groupId>com.sun.jna.examples</groupId> <groupId>com.sun.jna.examples</groupId>

@ -94,6 +94,7 @@ public class ConfigProperties {
private Integer port; private Integer port;
private Integer reconnectNum = 10; private Integer reconnectNum = 10;
private long reconnectInterval = 10000; private long reconnectInterval = 10000;
private long heartBeatTime = 30;
} }
@Data @Data

@ -4,13 +4,13 @@ package com.zhehekeji.web.service.client;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashMap; import java.time.LocalDateTime;
import java.util.Map; import java.util.*;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -19,25 +19,32 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j @Slf4j
public class ClientChanel { public class ClientChanel {
private static final Logger tcpLogger = LoggerFactory.getLogger("tcp");
static final Logger tcpLogger = LoggerFactory.getLogger("tcp");
/**
* key :
*/
static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
/** /**
* key : * key :
*/ */
private static Map<String, Channel> channelMap = new ConcurrentHashMap<>(); static Map<String, LocalDateTime> channelStringTime = new ConcurrentHashMap<>();
/** /**
* key :IP * key :IP
* value: * value:
*/ */
private static Map<String,String> IP_SRMNumberMap = new ConcurrentHashMap<>(); static Map<String,String> IP_SRMNumberMap = new ConcurrentHashMap<>();
/** /**
* key : * key :
* value: IP * value: IP
*/ */
private static Map<String,String> SRMNumber_IPMap = new ConcurrentHashMap<>(); static Map<String,String> SRMNumber_IPMap = new ConcurrentHashMap<>();
public static void putIp(String ip,String ID){ public static void putIp(String ip,String ID){
IP_SRMNumberMap.put(ip,ID); IP_SRMNumberMap.put(ip,ID);
@ -45,6 +52,7 @@ public class ClientChanel {
public static void putSRMNUmber_Ip(String ID,String ip){ public static void putSRMNUmber_Ip(String ID,String ip){
SRMNumber_IPMap.put(ID,ip); SRMNumber_IPMap.put(ID,ip);
IP_SRMNumberMap.put(ip,ID);
} }
public static String getIpFromId(String ID){ public static String getIpFromId(String ID){
@ -64,9 +72,23 @@ public class ClientChanel {
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
String clientIp = socketAddress.getAddress().getHostAddress(); String clientIp = socketAddress.getAddress().getHostAddress();
putSRMNUmber_Ip(SRMNumber, clientIp); putSRMNUmber_Ip(SRMNumber, clientIp);
channelStringTime.put(SRMNumber,LocalDateTime.now());
log.info("connect:{}巷道 ", SRMNumber); log.info("connect:{}巷道 ", SRMNumber);
} }
// static {
// Timer timer = new Timer();
// timer.scheduleAtFixedRate(new TimerTask() {
// @Override
// public void run() {
// for (String key :channelStringTime.keySet()){
// if(LocalDateTime.now().equals(channelStringTime.get(key).plusMinutes(5))) {
// channelStringTime.remove(key);
// disConnect(key);
// }
// }
// }
// },0,60000);
// }
public static void disConnect(String key){ public static void disConnect(String key){
channelMap.remove(key); channelMap.remove(key);
} }

@ -5,8 +5,11 @@ import com.zhehekeji.web.service.PlcService;
import com.zhehekeji.web.service.client.*; import com.zhehekeji.web.service.client.*;
import com.zhehekeji.web.service.ksec.KsecDecoder; import com.zhehekeji.web.service.ksec.KsecDecoder;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T; import org.apache.poi.ss.formula.functions.T;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -44,6 +47,12 @@ public class PTDecoder extends LineBasedFrameDecoder {
} }
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//ctx.writeAndFlush(Unpooled.copiedBuffer("1111111", CharsetUtil.UTF_8));
// // 获取到与 ChannelHandlerContext相关联的 Channel 的引用
// Channel channel = ctx.channel();
// // 通过 Channel 写入缓冲区
// channel.write(Unpooled.copiedBuffer("\n", CharsetUtil.UTF_8));
ctx.flush();
in = (ByteBuf) super.decode(ctx, in); in = (ByteBuf) super.decode(ctx, in);
if(in == null){ if(in == null){
log.info("no data"); log.info("no data");
@ -73,7 +82,7 @@ public class PTDecoder extends LineBasedFrameDecoder {
String body = in.toString(Charset.forName("UTF-8")); String body = in.toString(Charset.forName("UTF-8"));
tcpLogger.info("PT:{}",body); tcpLogger.info("PT:{}",body);
PTData ptData = new PTData(body); PTData ptData = new PTData(body);
log.info(ptData.getType().toString());
if(ptData.getType().equals(PTData.HEART_TYPE)){ if(ptData.getType().equals(PTData.HEART_TYPE)){
//心跳 什么都不处理,因为netty长链接断开会感知 //心跳 什么都不处理,因为netty长链接断开会感知

@ -1,5 +1,6 @@
package com.zhehekeji.web.service.putian; package com.zhehekeji.web.service.putian;
import com.zhehekeji.web.config.ConfigProperties;
import com.zhehekeji.web.service.EmptyCheckService; import com.zhehekeji.web.service.EmptyCheckService;
import com.zhehekeji.web.service.PlcService; import com.zhehekeji.web.service.PlcService;
import com.zhehekeji.web.service.ksec.*; import com.zhehekeji.web.service.ksec.*;
@ -26,17 +27,20 @@ public class PTFilter extends ChannelInitializer<SocketChannel> {
private PuTianNettyClient nettyClient; private PuTianNettyClient nettyClient;
public PTFilter(PlcService plcService, PuTianNettyClient nettyClient,EmptyCheckService emptyCheckService){ private ConfigProperties configProperties;
public PTFilter(PlcService plcService, PuTianNettyClient nettyClient,EmptyCheckService emptyCheckService,ConfigProperties configProperties){
this.plcService = plcService; this.plcService = plcService;
this.nettyClient = nettyClient; this.nettyClient = nettyClient;
this.emptyCheckService = emptyCheckService; this.emptyCheckService = emptyCheckService;
this.configProperties = configProperties;
} }
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline(); ChannelPipeline ph = ch.pipeline();
//30秒发一次心跳 //30秒发一次心跳
ph.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS)); ph.addLast(new IdleStateHandler(0, configProperties.getKsec().getHeartBeatTime() , 0, TimeUnit.SECONDS));
ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes()); ByteBuf byteBuf = Unpooled.copiedBuffer("$".getBytes());
ph.addLast(new PTDecoder(1000,true,true,emptyCheckService,plcService)); ph.addLast(new PTDecoder(1000,true,true,emptyCheckService,plcService));
ph.addLast(new PtEncoder()); ph.addLast(new PtEncoder());

@ -19,12 +19,15 @@ import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Timer;
import java.util.TimerTask;
@Slf4j @Slf4j
@Component @Component
public class PuTianNettyClient { public class PuTianNettyClient {
private static EventLoopGroup group = new NioEventLoopGroup(); //单线程拒绝多线程导致心跳频繁
private static EventLoopGroup group = new NioEventLoopGroup(1);
@Resource @Resource
private PlcService plcService; private PlcService plcService;
@Resource @Resource
@ -43,7 +46,7 @@ public class PuTianNettyClient {
client.group(group); client.group(group);
client.channel(NioSocketChannel.class); client.channel(NioSocketChannel.class);
client.handler(new PTFilter(plcService,this,emptyCheckService)); client.handler(new PTFilter(plcService,this,emptyCheckService,configProperties));
// 连接服务端 // 连接服务端
channel = client.connect(ksec.getIp(), ksec.getPort()).sync().channel(); channel = client.connect(ksec.getIp(), ksec.getPort()).sync().channel();
@ -55,47 +58,31 @@ public class PuTianNettyClient {
* @param upId * @param upId
*/ */
public void reconnect(Integer upId) { public void reconnect(Integer upId) {
Boolean isConnected = false;
int num = 0; Timer timer = new Timer();
ConfigProperties.KSEC ksec = configProperties.getKsec(); ConfigProperties.KSEC ksec = configProperties.getKsec();
if (ksec == null) { TimerTask timerTask = new TimerTask() {
log.error("reconnect ,upPc is null ,id:{}", upId); int num = 0;
return; @Override
} public void run() {
try { if (num >=ksec.getReconnectNum() && ksec.getReconnectNum()>=0){
Thread.sleep(1500); timer.cancel();
} catch (InterruptedException e) { }
e.printStackTrace();
} try {
while (ksec.getReconnectNum() == -1 || num < ksec.getReconnectNum() && !isConnected) { createClient(ksec);
//while (num < RECONNECT_NUM && !isConnected) { log.info("plc reconnect success");
try { timer.cancel();
Thread.sleep(ksec.getReconnectInterval()); } catch (Exception e) {
} catch (InterruptedException e) { //没连上 继续
e.printStackTrace(); log.error("reconnect error num:{}", num);
} //关闭当前链接
try { num++;
createClient(ksec); }
} catch (Exception e) {
// channel.close();
//没连上 继续
log.error("reconnect error num:{}", num);
//关闭当前链接
num++;
// try{
// Thread.sleep(ksec.getReconnectInterval());
// }catch (Exception ex){
// throw new RuntimeException(ex);
// }
continue;
} }
isConnected = true; };
} timer.scheduleAtFixedRate(timerTask,1000,ksec.getReconnectInterval());
if (isConnected) {
log.info("plc reconnect success");
} else {
log.error("plc reconnect error .upPcId:{},reconnect num:{},ip:{},port:{}", upId, num, ksec.getIp(), ksec.getPort());
}
} }
public static void write(PTData ptData){ public static void write(PTData ptData){

@ -14,7 +14,7 @@ spring:
testWhileIdle: false testWhileIdle: false
timeBetweenEvictionRunsMillis: 60000 timeBetweenEvictionRunsMillis: 60000
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
url: jdbc:mysql://127.0.0.1:3306/lia_duoji?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8 url: jdbc:mysql://127.0.0.1:3306/lia_duo_nanjingyancao?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
username: root username: root
validationQuery: SELECT 1 FROM DUAL validationQuery: SELECT 1 FROM DUAL
# --------本服务端口号 # --------本服务端口号
@ -52,6 +52,11 @@ serverMode: 1
ksec: ksec:
ip: 127.0.0.1 ip: 127.0.0.1
port: 3000 port: 3000
#断点重连的次数:-1->不断重连
reconnectNum: -1
#断点重连的时间间隔(单位ms)
reconnectInterval: 1000
heartBeatTime: 30
# ------------ 实时视频流 全部页面的格式 行列数量 # ------------ 实时视频流 全部页面的格式 行列数量
videoStyleConfig: videoStyleConfig:
videoStyleRow: 2 videoStyleRow: 2

@ -10,7 +10,7 @@
and t.order_num = #{req.orderNum} and t.order_num = #{req.orderNum}
</if> </if>
<if test="req.startTimestamp != null and req.endTimestamp != null"> <if test="req.startTimestamp != null and req.endTimestamp != null">
and t.start_time >= #{req.startTimestamp} and t.start_time &lt;= #{req.endTimestamp} and t.into_stock_time >= #{req.startTimestamp} and t.into_stock_time &lt;= #{req.endTimestamp}
</if> </if>
</where> </where>
order by t.id desc order by t.id desc

Loading…
Cancel
Save