From b16f24dd96cb89241b768ede7511c9e1a3c99cc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?LAPTOP-S9HJSOEB=5C=E6=98=8A=E5=A4=A9?= Date: Wed, 15 May 2024 14:27:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/service/client/ClientChanel.java | 111 +++++++++++++ .../zhehekeji/web/service/client/Decoder.java | 131 +++++++++++++++ .../zhehekeji/web/service/client/Encoder.java | 24 +++ .../service/client/IntelliBlinkClient.java | 4 + .../service/client/NettyConnectHandler.java | 44 +++++ .../web/service/client/NettyServer.java | 60 +++++++ .../web/service/client/TransmissionPojo.java | 156 ++++++++++++++++++ .../web/service/client/TransmissionType.java | 14 ++ 8 files changed, 544 insertions(+) create mode 100644 web/src/main/java/com/zhehekeji/web/service/client/ClientChanel.java create mode 100644 web/src/main/java/com/zhehekeji/web/service/client/Decoder.java create mode 100644 web/src/main/java/com/zhehekeji/web/service/client/Encoder.java create mode 100644 web/src/main/java/com/zhehekeji/web/service/client/IntelliBlinkClient.java create mode 100644 web/src/main/java/com/zhehekeji/web/service/client/NettyConnectHandler.java create mode 100644 web/src/main/java/com/zhehekeji/web/service/client/NettyServer.java create mode 100644 web/src/main/java/com/zhehekeji/web/service/client/TransmissionPojo.java create mode 100644 web/src/main/java/com/zhehekeji/web/service/client/TransmissionType.java diff --git a/web/src/main/java/com/zhehekeji/web/service/client/ClientChanel.java b/web/src/main/java/com/zhehekeji/web/service/client/ClientChanel.java new file mode 100644 index 0000000..4231f2d --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/client/ClientChanel.java @@ -0,0 +1,111 @@ +package com.zhehekeji.web.service.client; + + +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; +import java.time.LocalDateTime; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 所有的客户端的chanel + */ +@Slf4j +public class ClientChanel { + + + static final Logger tcpLogger = LoggerFactory.getLogger("tcp"); + + /** + * key : 巷道标识符 + */ + static Map channelMap = new ConcurrentHashMap<>(); + + + /** + * key : 巷道标识符 + */ + static Map channelStringTime = new ConcurrentHashMap<>(); + + + /** + * key :IP + * value: 巷道标识符 + */ + static Map IP_SRMNumberMap = new ConcurrentHashMap<>(); + + /** + * key :巷道标识符 + * value: IP + */ + static Map SRMNumber_IPMap = new ConcurrentHashMap<>(); + + public static void putIp(String ip,String ID){ + IP_SRMNumberMap.put(ip,ID); + } + + public static void putSRMNUmber_Ip(String ID,String ip){ + SRMNumber_IPMap.put(ID,ip); + IP_SRMNumberMap.put(ip,ID); + } + + public static String getIpFromId(String ID){ + return SRMNumber_IPMap.get(ID); + } + + public static void deleteIp(String ip){ + IP_SRMNumberMap.remove(ip); + } + + public static String getIDFromIp(String ip){ + return IP_SRMNumberMap.get(ip); + } + + public static void connect(String SRMNumber, Channel channel){ + channelMap.put(SRMNumber,channel); + InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress(); + String clientIp = socketAddress.getAddress().getHostAddress(); + putSRMNUmber_Ip(SRMNumber, clientIp); + channelStringTime.put(SRMNumber,LocalDateTime.now()); + log.info("connect:{}巷道 ", SRMNumber); + } +// static { +// Timer timer = new Timer(); +// timer.scheduleAtFixedRate(new TimerTask() { +// @Override +// public void run() { +// for (String key :channelStringTime.keySet()){ +// if(LocalDateTime.now().equals(channelStringTime.get(key).plusMinutes(5))) { +// channelStringTime.remove(key); +// disConnect(key); +// } +// } +// } +// },0,60000); +// } + public static void disConnect(String key){ + channelMap.remove(key); + } + + public static Set keys(){ + return channelMap.keySet(); + } + + public static Channel get(String key){ + return channelMap.get(key); + } + + public static void write(String data,String key){ + if(channelMap.get(key) != null){ + channelMap.get(key).writeAndFlush(data); + }else { + tcpLogger.info("no connect client:{}",key); + } + } + +} diff --git a/web/src/main/java/com/zhehekeji/web/service/client/Decoder.java b/web/src/main/java/com/zhehekeji/web/service/client/Decoder.java new file mode 100644 index 0000000..ca85c3f --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/client/Decoder.java @@ -0,0 +1,131 @@ +package com.zhehekeji.web.service.client; + +import com.zhehekeji.web.lib.CameraDelayTask; +import com.zhehekeji.web.service.EmptyCheckService; +import com.zhehekeji.web.service.PlcService; +import com.zhehekeji.web.service.ksec.KsecInfo; +import com.zhehekeji.web.service.ksec.KsecNettyClient; +import com.zhehekeji.web.service.putian.*; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.Charset; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 客户端解码器 连接用 + */ +@Slf4j +public class Decoder extends DelimiterBasedFrameDecoder { + + private static final Logger tcpLogger = LoggerFactory.getLogger("tcp"); + + public static String START_CHECK = "ST"; + private static String GET_PHOTO = "GP"; + private static String GET_PHOTO_END = "GPE"; + private static String RETURN_CHECK = "RTE"; + private static String FOLLOW_GET_PHOTO_END = "SGPE"; + private static String END_CHECK = "EN"; + private static String HEART_BEAT = "HB"; + + + + private static String END_STRING = "$"; + + private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,15,30, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(20000)); + + + private PlcService plcService; + + public Decoder(PlcService plcService) { + + super(20000,true,false, Unpooled.copiedBuffer(">".getBytes()), + Unpooled.copiedBuffer("$".getBytes())); + this.plcService = plcService; + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + + in = (ByteBuf) super.decode(ctx, in); + if(in == null){ + log.debug("no data"); + return null; + } + ClientRunnable clientRunnable = new ClientRunnable(in,ctx,plcService); + threadPoolExecutor.execute(clientRunnable); + return null; + } + + public static class ClientRunnable implements Runnable { + + private ByteBuf in; + + private ChannelHandlerContext ctx; + + private PlcService plcService; + + public ClientRunnable(ByteBuf in,ChannelHandlerContext ctx,PlcService plcService){ + this.ctx = ctx; + this.in = in; + this.plcService = plcService; + } + + @Override + public void run() { + String body = in.toString(Charset.forName("UTF-8")); + tcpLogger.info("receive client:{}, data length:{}", body, body.length()); + TransmissionPojo transmissionPojo = new TransmissionPojo(body); + //心跳进行连接 + if(HEART_BEAT.equals(transmissionPojo.getHeader())){ + ClientChanel.connect(transmissionPojo.getStreetNumber(),ctx.channel()); + } + //获取照片后发送进行计算逻辑,并在拍照队列中取出拍照发送 + else if(GET_PHOTO_END.equals(transmissionPojo.getHeader())){ + if(transmissionPojo.getTaskId() != null && !"0".equals(transmissionPojo.getTaskId())) { + ClientChanel.get(transmissionPojo.getStreetNumber()).writeAndFlush(transmissionPojo.toString(TransmissionType.RTS)); + } + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + //删除队列的拍照数据 + GetPhotoDelayExecutor.remove(transmissionPojo.getStreetNumber(),transmissionPojo.toString(TransmissionType.GPS)); + //发送给上位机 + CameraDelayTask cameraDelayTask = GetPhotoDelayExecutor.getNext(transmissionPojo.getStreetNumber()); + if (cameraDelayTask != null) { + TransmissionPojo pojo = new TransmissionPojo(cameraDelayTask.getCommand()); + ClientChanel.get(pojo.getStreetNumber()).writeAndFlush(pojo.toString(TransmissionType.GPS)); + } + + } + //照片和结果保存,并发送给上位机 + else if(RETURN_CHECK.equals(transmissionPojo.getHeader())){ + //保存数据 + plcService.visualCalculationResults(transmissionPojo); + //发送给上位机 + KsecInfo ksecInfo = plcService.getKsecDataInfo(transmissionPojo,"E"); + ksecInfo.getData().setTypeNum(transmissionPojo.getCategory()); + ksecInfo.getData().setQuantity(transmissionPojo.getCount()); + ksecInfo.getData().setCheckRlt(transmissionPojo.getResult()); + KsecNettyClient.write(ksecInfo); + + + + } + //照片和结果保存,并发送给上位机 + else if(FOLLOW_GET_PHOTO_END.equals(transmissionPojo.getHeader())){ + //保存数据 + plcService.saveFollowPhoto(transmissionPojo); + } + } + } +} diff --git a/web/src/main/java/com/zhehekeji/web/service/client/Encoder.java b/web/src/main/java/com/zhehekeji/web/service/client/Encoder.java new file mode 100644 index 0000000..a523252 --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/client/Encoder.java @@ -0,0 +1,24 @@ +package com.zhehekeji.web.service.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; + +/** + * 客户端传输协议 以;结尾 + */ +public class Encoder extends MessageToByteEncoder { + private static String END_STRING = ">"; + + private static final Logger tcpLogger = LoggerFactory.getLogger("tcp"); + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception { + data = data + END_STRING; + tcpLogger.info("send to client:{}, length:{}",data, data.length()); + byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/web/src/main/java/com/zhehekeji/web/service/client/IntelliBlinkClient.java b/web/src/main/java/com/zhehekeji/web/service/client/IntelliBlinkClient.java new file mode 100644 index 0000000..d594f10 --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/client/IntelliBlinkClient.java @@ -0,0 +1,4 @@ +package com.zhehekeji.web.service.client; + +public class IntelliBlinkClient { +} diff --git a/web/src/main/java/com/zhehekeji/web/service/client/NettyConnectHandler.java b/web/src/main/java/com/zhehekeji/web/service/client/NettyConnectHandler.java new file mode 100644 index 0000000..fe55324 --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/client/NettyConnectHandler.java @@ -0,0 +1,44 @@ +package com.zhehekeji.web.service.client; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; + +/** + * 客户端的上下线 + * + * @author Administrator + * + */ +@Slf4j +public class NettyConnectHandler extends ChannelInboundHandlerAdapter { + + /** + * 建立连接时 + */ + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = socketAddress.getAddress().getHostAddress(); + int clientPort = socketAddress.getPort(); + log.info("ip:{} port:{} connected",clientIp, clientPort); + ctx.fireChannelActive(); + } + + /** + * 关闭连接时 + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + String clientIp = socketAddress.getAddress().getHostAddress(); + int clientPort = socketAddress.getPort(); + log.info("ip:{} port:{} disconnected",clientIp, clientPort); + String ID = ClientChanel.getIDFromIp(clientIp); + //设置客户端下线 + ClientChanel.disConnect(ID); + } + +} diff --git a/web/src/main/java/com/zhehekeji/web/service/client/NettyServer.java b/web/src/main/java/com/zhehekeji/web/service/client/NettyServer.java new file mode 100644 index 0000000..ec4266d --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/client/NettyServer.java @@ -0,0 +1,60 @@ +package com.zhehekeji.web.service.client; + +import com.zhehekeji.web.service.EmptyCheckService; +import com.zhehekeji.web.service.PlcService; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.SneakyThrows; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +@Component +public class NettyServer { + + @Resource + private PlcService plcService; + + public void CreateNettyServer(int port) throws InterruptedException { + Thread thread = new Thread(new Runnable() { + @SneakyThrows + @Override + public void run() { + EventLoopGroup bossGroup = new NioEventLoopGroup(); + // 从线程组, 老板线程组会把任务丢给他,让手下线程组去做任务 + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + // netty服务器的创建, 辅助工具类,用于服务器通道的一系列配置 + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(bossGroup, workerGroup) //绑定两个线程组 + .channel(NioServerSocketChannel.class) //指定NIO的模式 + + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new NettyConnectHandler()); + ch.pipeline().addLast(new Decoder(plcService)); + ch.pipeline().addLast(new Encoder()); + }}); // 子处理器,用于处理workerGroup + + // 启动server,并且设置8088为启动的端口号,同时启动方式为同步 + ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); + + // 监听关闭的channel,设置位同步方式 + channelFuture.channel().closeFuture().sync(); + } finally { + //退出线程组 + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + }); + thread.start(); + } +} diff --git a/web/src/main/java/com/zhehekeji/web/service/client/TransmissionPojo.java b/web/src/main/java/com/zhehekeji/web/service/client/TransmissionPojo.java new file mode 100644 index 0000000..6911619 --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/client/TransmissionPojo.java @@ -0,0 +1,156 @@ +package com.zhehekeji.web.service.client; + +import com.zhehekeji.web.entity.CheckLog; +import com.zhehekeji.web.entity.Street; +import lombok.Data; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +@Data +public class TransmissionPojo { + private String data; + private String[] dataArray; + private String header; + private String streetNumber;//巷道标识符 + + private Integer row;//行 + private Integer column;//列 + + private String pcd;//点位文件路径 + private String taskId; + private String checkId; + private Integer direction;//左右 + private Integer count = 0; //数量 + private String category = ""; //品规 + private Set picsPath; //照片 + + + private Integer result; + public TransmissionPojo(CheckLog checkLog , Street street){ + this.checkId = checkLog.getId().toString(); + this.streetNumber = street.getPlcId(); + this.taskId = checkLog.getLotnum(); + this.row = checkLog.getRow(); + this.column = checkLog.getColumn(); + this.direction = checkLog.getDirection(); + this.count = checkLog.getCount(); + if(checkLog.getCategory() != null && checkLog.getCategory().contains("/")) { + this.category = checkLog.getCategory().split("/")[0]; + }else { + this.category = checkLog.getCategory(); + } + + } + + /** + * 将给定数字转换为指定长度(默认3位)的字符,不足部分用“0”补全。 + * + * @param number 需要转换的数字 + * @param length 目标字符长度(默认3) + * @return 符合要求的字符串 + */ + public static String convertNumberToFixedLengthChars(int number, int length) { + if (length < 1) { + throw new IllegalArgumentException("Length must be a positive integer."); + } + + String numStr = Integer.toString(number); + while (numStr.length() < length) { + numStr = "0" + numStr; + } + return numStr; + } + public TransmissionPojo( Street street,Integer row , Integer column,Integer direction,String orderId){ + this.streetNumber = street.getPlcId(); + this.row = row; + + this.taskId = "0"; + this.direction = direction; + this.column = column; + this.checkId = convertNumberToFixedLengthChars(row, 3) + convertNumberToFixedLengthChars(column, 3); + + } + public String toString(TransmissionType type) { + this.header = type.toString(); + if (type == TransmissionType.ST) { + return "ST&" + streetNumber + "&" + checkId + "&" + taskId + "&" + direction ; + }else if (type == TransmissionType.GPS) { + return "GPS&" + streetNumber +"/"+row +"/"+column+ "&" + checkId + "&" + taskId + "&" + direction+ "&" + category + "&" + count; + }else if (type == TransmissionType.RTS) { + return "RTS&" + streetNumber +"/"+row +"/"+column+ "&" + checkId + "&" + taskId + "&" + direction + "&" + category + "&" + count; + } + return ""; + } + + public TransmissionPojo(String data) { + this.data = data; + data = data.replace(">", ""); + this.dataArray = data.split("&"); + this.header = dataArray[0]; + if(dataArray[1].contains("/")) { + String[] dataArr = dataArray[1].split("/"); + this.streetNumber = dataArr[0]; + this.row = Integer.valueOf(dataArr[1]); + if(dataArr.length > 2) { + this.column = Integer.valueOf(dataArr[2]); + } + }else { + this.streetNumber = dataArray[1]; + } + if (dataArray.length > 2) { + this.checkId = dataArray[2]; + this.taskId = (dataArray[3]); + if(dataArray[4] != null && !"".equals(dataArray[4]) && !"null".equals(dataArray[4])) { + this.direction = Integer.valueOf(dataArray[4]); + } + } + + if (dataArray.length > 5) { + if(dataArray[5] != null &&!"".equals(dataArray[5]) && !"null".equals(dataArray[5]) ) { + + this.category = dataArray[5]; + } + if( dataArray.length > 6 && dataArray[6] != null && !"".equals(dataArray[6]) && ! "null".equals(dataArray[5])) { + this.count = Integer.valueOf(dataArray[6]); + } + } + if (dataArray.length > 7) { + this.result = Integer.parseInt(dataArray[7])==0 ?0:1; + //不记录2d图像 + //String path2D = "/fileData/"+this.taskId+"/"+this.checkId+"/"+this.direction+".jpg"; + String path3D = "/fileData/"+this.taskId+"/"+this.checkId+"/"+(this.direction+2)+".PNG"; + Set strings = new HashSet<>(); + //strings.add(path2D); + strings.add(path3D); + + // 将数组中的元素添加到 Set 集合中 + if(dataArray.length > 8 && dataArray[8].split(";").length>0) { + String[] s = dataArray[8].split(";"); + for (int i = 0; i < s.length; i++) { + s[i] = s[i].replace("\\\\","/"); + s[i] = s[i].replace("\\","/"); + if(s[i].startsWith(".")) { + s[i] = s[i].replaceFirst(".", ""); // 移除单个字符 + } + // 或者 + // stringArray[i] = stringArray[i].replaceAll(regexToRemove, ""); // 使用正则表达式移除字符类别 + } + Collections.addAll(strings,s); + strings.remove(""); + } + this.picsPath = strings; + + } + if(this.taskId != null && !"".equals(this.taskId) && this.checkId != null && !"".equals(this.checkId)) + this.pcd = "/fileData/"+this.taskId+"/"+this.checkId+"/"+(this.direction+2)+".pcd"; + + } + + public static void main(String[] args) { + TransmissionPojo transmissionPojo = new TransmissionPojo("GPE&002/1/18&7&2&123&41111&30&0&fff;.\\\\f.png;/f.png>"); + System.out.println(transmissionPojo); + } + +} diff --git a/web/src/main/java/com/zhehekeji/web/service/client/TransmissionType.java b/web/src/main/java/com/zhehekeji/web/service/client/TransmissionType.java new file mode 100644 index 0000000..40d82f6 --- /dev/null +++ b/web/src/main/java/com/zhehekeji/web/service/client/TransmissionType.java @@ -0,0 +1,14 @@ +package com.zhehekeji.web.service.client; + +public enum TransmissionType { + ST , + GPS , + GPE , + SGPS , + SGPE , + RTS , + RTE , + EN , + ZHB ; + +}