From e45ec50b9ebe8423bda6ecfd1662aa3d76b3e5b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E4=B8=80=E9=B8=A3?= Date: Thu, 14 Jan 2021 15:14:25 +0800 Subject: [PATCH] add tcp --- web/pom.xml | 6 +- .../com/zhehekeji/web/lib/LoginModule.java | 30 ++--- .../zhehekeji/web/service/InitService.java | 35 ++++-- .../web/service/MyProtocolDecoder.java | 118 ++++++++++++++++++ .../zhehekeji/web/service/TcpListener.java | 57 +++++++++ web/src/main/resources/application-dev.yml | 1 + web/src/main/resources/application-test.yml | 5 +- web/src/main/resources/logback-spring.xml | 26 +--- 8 files changed, 226 insertions(+), 52 deletions(-) create mode 100644 web/src/main/java/com/zhehekeji/web/service/MyProtocolDecoder.java create mode 100644 web/src/main/java/com/zhehekeji/web/service/TcpListener.java diff --git a/web/pom.xml b/web/pom.xml index 2dcd267..5338991 100644 --- a/web/pom.xml +++ b/web/pom.xml @@ -38,7 +38,11 @@ common 1.0.0 - + + io.netty + netty-all + 4.1.50.Final + com.zhehekeji filter diff --git a/web/src/main/java/com/zhehekeji/web/lib/LoginModule.java b/web/src/main/java/com/zhehekeji/web/lib/LoginModule.java index ab205a4..6cc7ef0 100644 --- a/web/src/main/java/com/zhehekeji/web/lib/LoginModule.java +++ b/web/src/main/java/com/zhehekeji/web/lib/LoginModule.java @@ -44,21 +44,21 @@ public class LoginModule { //Res res = Res.string(); //打开日志,可选 - NetSDKLib.LOG_SET_PRINT_INFO setLog = new NetSDKLib.LOG_SET_PRINT_INFO(); - File path = new File("./sdklog/"); - if (!path.exists()) { - path.mkdir(); - } - String logPath = path.getAbsoluteFile().getParent() + "\\sdklog\\" + ToolKits.getDate() + ".log"; - setLog.nPrintStrategy = 0; - setLog.bSetFilePath = 1; - System.arraycopy(logPath.getBytes(), 0, setLog.szLogFilePath, 0, logPath.getBytes().length); - System.out.println(logPath); - setLog.bSetPrintStrategy = 1; - bLogopen = netsdk.CLIENT_LogOpen(setLog); - if(!bLogopen ) { - System.err.println("Failed to open NetSDK log"); - } +// NetSDKLib.LOG_SET_PRINT_INFO setLog = new NetSDKLib.LOG_SET_PRINT_INFO(); +// File path = new File("./sdklog/"); +// if (!path.exists()) { +// path.mkdir(); +// } +// String logPath = path.getAbsoluteFile().getParent() + "\\sdklog\\" + ToolKits.getDate() + ".log"; +// setLog.nPrintStrategy = 0; +// setLog.bSetFilePath = 1; +// System.arraycopy(logPath.getBytes(), 0, setLog.szLogFilePath, 0, logPath.getBytes().length); +// System.out.println(logPath); +// setLog.bSetPrintStrategy = 1; +// bLogopen = netsdk.CLIENT_LogOpen(setLog); +// if(!bLogopen ) { +// System.err.println("Failed to open NetSDK log"); +// } // 设置断线重连回调接口,设置过断线重连成功回调函数后,当设备出现断线情况,SDK内部会自动进行重连操作 // 此操作为可选操作,但建议用户进行设置 diff --git a/web/src/main/java/com/zhehekeji/web/service/InitService.java b/web/src/main/java/com/zhehekeji/web/service/InitService.java index 4073d45..c5f0617 100644 --- a/web/src/main/java/com/zhehekeji/web/service/InitService.java +++ b/web/src/main/java/com/zhehekeji/web/service/InitService.java @@ -21,19 +21,36 @@ public class InitService implements ApplicationRunner { @Resource private CameraMapper cameraMapper; + @Resource + private TcpListener tcpListener; @Override public void run(ApplicationArguments args) throws Exception { List cameras = cameraMapper.selectByMap(new HashMap<>(0)); - cameras.forEach(camera -> { - NetSDKLib.LLong lLong = LoginModule.login(camera.getIp(),camera.getPort(),camera.getUser(),camera.getPassword(),camera.getId()); - if(lLong.longValue() <= 0){ - log.error("初始相机 登录失败:cameraId{}",camera.getId()); - }else { - CameraConnMap.conn(camera.getId(),lLong); - log.debug("初始相机登录成功 cameraId:{}",camera.getId()); - } - }); + loginThread loginThread = new loginThread(cameras); + loginThread.start(); + tcpListener.run(); + + } + + class loginThread extends Thread{ + + List cameras; + public loginThread(List cameras){ + this.cameras = cameras; + } + @Override + public void run() { + cameras.forEach(camera -> { + NetSDKLib.LLong lLong = LoginModule.login(camera.getIp(),camera.getPort(),camera.getUser(),camera.getPassword(),camera.getId()); + if(lLong.longValue() <= 0){ + log.error("初始相机 登录失败:cameraId{}",camera.getId()); + }else { + CameraConnMap.conn(camera.getId(),lLong); + log.debug("初始相机登录成功 cameraId:{}",camera.getId()); + } + }); + } } } diff --git a/web/src/main/java/com/zhehekeji/web/service/MyProtocolDecoder.java b/web/src/main/java/com/zhehekeji/web/service/MyProtocolDecoder.java new file mode 100644 index 0000000..cd44ac7 --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/MyProtocolDecoder.java @@ -0,0 +1,118 @@ +package com.zhehekeji.web.service; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.Charset; +import java.util.concurrent.ThreadPoolExecutor; + +@Slf4j +public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder { + + private ThreadPoolExecutor threadPoolExecutor; + + private OrderService orderService; + /** + * + * @param maxFrameLength 帧的最大长度 + * @param lengthFieldOffset length字段偏移的地址 + * @param lengthFieldLength length字段所占的字节长 + * @param lengthAdjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段 + * @param initialBytesToStrip 解析时候跳过多少个长度 + * @param failFast 为true,当frame长度超过maxFrameLength时立即报TooLongFrameException异常,为false,读取完整个帧再报异 + */ + + public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast, ThreadPoolExecutor threadPoolExecutor, OrderService orderService) { + + super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); + this.threadPoolExecutor = threadPoolExecutor; + this.orderService = orderService; + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + in = (ByteBuf) super.decode(ctx,in); + + if(in == null){ + return null; + } + //包头 `L` `P` + char l = in.readChar(); + char p = in.readChar(); + if(l != 76 && p != 80){ + //不是包头 丢 + return null; + } + short length = in.readShort(); + log.debug("length:{}",length); + CharSequence charSequence = in.readCharSequence(6, Charset.defaultCharset()); + String plcId = charSequence.toString(); + log.debug("plcId:{}",plcId); + // OA=心跳 OB=工单 OC=任务 OD=告警 + CharSequence charSequence1 = in.readCharSequence(2,Charset.defaultCharset()); + String type = charSequence1.toString(); + log.debug("type:{}",type); + CharSequence charSequence2 = in.readCharSequence(20,Charset.defaultCharset()); + String orderNum = charSequence2.toString(); + log.debug("订单号:{}",orderNum); + char maohao = in.readChar(); + log.debug("冒号:{}",maohao); + char leixing = in.readChar(); + log.debug("---{}",leixing); + in.readBytes(8); + +// //读取type字段 +// byte header1 = in.readByte(); +// byte header2 = in.readByte(); +// byte header3 = in.readByte(); +// byte header4 = in.readByte(); +// byte header5 = in.readByte(); +// byte header6 = in.readByte(); +// if(header1 != 20 && header2 != 20 && header3 != 9 && header4 != 7 && header5 != 11 && header6 != 30){ +// //不是包头 直接丢弃 +// return null; +// } +// long length = in.readUnsignedInt(); +// if(in.readableBytes()!=length){ +// throw new Exception("标记的长度不符合实际长度"); +// } +// //读取body +// byte []bytes = new byte[in.readableBytes()]; +// in.readBytes(bytes); +// String string = new String(bytes,"UTF-8"); +// String substring = string.substring(string.indexOf("{"), string.lastIndexOf("}") + 1); +// JSONObject jsonObject = JSONObject.parseObject(substring); +// String deviceId = jsonObject.getString("device_id"); +// log.debug("tcp deviceId:{}",deviceId); +// String dataLength = jsonObject.getString("data_length"); +// Integer flawType = jsonObject.getInteger("msg"); +// String detectStatus = ""; +// if(flawType > 0){ +// detectStatus = "NG"; +// }else { +// detectStatus = "OK"; +// } +// String result = jsonObject.getString("result"); +// if(!StringUtils.isEmpty(result) && !"{}".equals(result)){ +// log.debug(result); +// AdaptData adaptData = JSONObject.parseObject(result,AdaptData.class); +// if(adaptData != null){ +// FlawRunnable flawRunnable = new FlawRunnable(jdbcTemplate,deviceId,"flaw",adaptData); +// threadPoolExecutor.execute(flawRunnable); +// } +// }else { +// log.warn("result为空:{}",result); +// } +// String s2 = string.substring(string.lastIndexOf("}") + 1, string.length()); +// WebSocketMapUtil.send(deviceId,s2,detectStatus); + in.release(); + + return null; + + } + + + +} diff --git a/web/src/main/java/com/zhehekeji/web/service/TcpListener.java b/web/src/main/java/com/zhehekeji/web/service/TcpListener.java new file mode 100644 index 0000000..71cc16b --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/TcpListener.java @@ -0,0 +1,57 @@ +package com.zhehekeji.web.service; + + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Component +@Slf4j +public class TcpListener { + + @Value("${tcpPort}") + private Integer tcpPort; + + private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 100, 200, + TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10000000)); + + @Resource + private OrderService orderService; + + public void run(){ + EventLoopGroup boss = new NioEventLoopGroup(); + EventLoopGroup work = new NioEventLoopGroup(); + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(boss,work).channel(NioServerSocketChannel.class).localAddress(tcpPort).childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline().addLast("handler", new MyProtocolDecoder(1000*10000,6,4,0,0,false,threadPoolExecutor,orderService)); // 自定义业务逻辑处理器 + } + }).childOption(ChannelOption.SO_KEEPALIVE, true); + ChannelFuture future = bootstrap.bind().sync(); + log.info("netty server started, Listening on " + tcpPort); + future.channel().closeFuture().sync(); + }catch (Exception e){ + log.error("netty出错:{}"+e); + e.printStackTrace(); + }finally { + work.shutdownGracefully(); + boss.shutdownGracefully(); + } + } + +} diff --git a/web/src/main/resources/application-dev.yml b/web/src/main/resources/application-dev.yml index b1c44d0..758ee76 100644 --- a/web/src/main/resources/application-dev.yml +++ b/web/src/main/resources/application-dev.yml @@ -38,3 +38,4 @@ cameraUser: admin cameraPort: 37777 mediaPath: d:\\media\ mp4Path: d:\\mp4\ +tcpPort: 2021 diff --git a/web/src/main/resources/application-test.yml b/web/src/main/resources/application-test.yml index 59057f7..f16a065 100644 --- a/web/src/main/resources/application-test.yml +++ b/web/src/main/resources/application-test.yml @@ -36,5 +36,6 @@ picPort: 8544 cameraPassword: hzleaper123 cameraUser: admin cameraPort: 37777 -mediaPath: d:\\media\ -mp4Path: d:\\media_mp4\ +mediaPath: /home/test/lia/media/ +mp4Path: /home/test/lia/mp4/ +tcpPort: 2021 diff --git a/web/src/main/resources/logback-spring.xml b/web/src/main/resources/logback-spring.xml index cc623b6..c8a3364 100644 --- a/web/src/main/resources/logback-spring.xml +++ b/web/src/main/resources/logback-spring.xml @@ -88,32 +88,8 @@ - - - - - - - - - - - - - - - - - - - - - - - - - +