增加客户端控制
parent
6a80d18cd7
commit
b16f24dd96
@ -0,0 +1,111 @@
|
||||
package com.zhehekeji.web.service.client;
|
||||
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 所有的客户端的chanel
|
||||
*/
|
||||
@Slf4j
|
||||
public class ClientChanel {
|
||||
|
||||
|
||||
static final Logger tcpLogger = LoggerFactory.getLogger("tcp");
|
||||
|
||||
/**
|
||||
* key : 巷道标识符
|
||||
*/
|
||||
static Map<String, Channel> channelMap = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
/**
|
||||
* key : 巷道标识符
|
||||
*/
|
||||
static Map<String, LocalDateTime> channelStringTime = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
/**
|
||||
* key :IP
|
||||
* value: 巷道标识符
|
||||
*/
|
||||
static Map<String,String> IP_SRMNumberMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* key :巷道标识符
|
||||
* value: IP
|
||||
*/
|
||||
static Map<String,String> SRMNumber_IPMap = new ConcurrentHashMap<>();
|
||||
|
||||
public static void putIp(String ip,String ID){
|
||||
IP_SRMNumberMap.put(ip,ID);
|
||||
}
|
||||
|
||||
public static void putSRMNUmber_Ip(String ID,String ip){
|
||||
SRMNumber_IPMap.put(ID,ip);
|
||||
IP_SRMNumberMap.put(ip,ID);
|
||||
}
|
||||
|
||||
public static String getIpFromId(String ID){
|
||||
return SRMNumber_IPMap.get(ID);
|
||||
}
|
||||
|
||||
public static void deleteIp(String ip){
|
||||
IP_SRMNumberMap.remove(ip);
|
||||
}
|
||||
|
||||
public static String getIDFromIp(String ip){
|
||||
return IP_SRMNumberMap.get(ip);
|
||||
}
|
||||
|
||||
public static void connect(String SRMNumber, Channel channel){
|
||||
channelMap.put(SRMNumber,channel);
|
||||
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
|
||||
String clientIp = socketAddress.getAddress().getHostAddress();
|
||||
putSRMNUmber_Ip(SRMNumber, clientIp);
|
||||
channelStringTime.put(SRMNumber,LocalDateTime.now());
|
||||
log.info("connect:{}巷道 ", SRMNumber);
|
||||
}
|
||||
// static {
|
||||
// Timer timer = new Timer();
|
||||
// timer.scheduleAtFixedRate(new TimerTask() {
|
||||
// @Override
|
||||
// public void run() {
|
||||
// for (String key :channelStringTime.keySet()){
|
||||
// if(LocalDateTime.now().equals(channelStringTime.get(key).plusMinutes(5))) {
|
||||
// channelStringTime.remove(key);
|
||||
// disConnect(key);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// },0,60000);
|
||||
// }
|
||||
public static void disConnect(String key){
|
||||
channelMap.remove(key);
|
||||
}
|
||||
|
||||
public static Set<String> keys(){
|
||||
return channelMap.keySet();
|
||||
}
|
||||
|
||||
public static Channel get(String key){
|
||||
return channelMap.get(key);
|
||||
}
|
||||
|
||||
public static void write(String data,String key){
|
||||
if(channelMap.get(key) != null){
|
||||
channelMap.get(key).writeAndFlush(data);
|
||||
}else {
|
||||
tcpLogger.info("no connect client:{}",key);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,131 @@
|
||||
package com.zhehekeji.web.service.client;
|
||||
|
||||
import com.zhehekeji.web.lib.CameraDelayTask;
|
||||
import com.zhehekeji.web.service.EmptyCheckService;
|
||||
import com.zhehekeji.web.service.PlcService;
|
||||
import com.zhehekeji.web.service.ksec.KsecInfo;
|
||||
import com.zhehekeji.web.service.ksec.KsecNettyClient;
|
||||
import com.zhehekeji.web.service.putian.*;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 客户端解码器 连接用
|
||||
*/
|
||||
@Slf4j
|
||||
public class Decoder extends DelimiterBasedFrameDecoder {
|
||||
|
||||
private static final Logger tcpLogger = LoggerFactory.getLogger("tcp");
|
||||
|
||||
public static String START_CHECK = "ST";
|
||||
private static String GET_PHOTO = "GP";
|
||||
private static String GET_PHOTO_END = "GPE";
|
||||
private static String RETURN_CHECK = "RTE";
|
||||
private static String FOLLOW_GET_PHOTO_END = "SGPE";
|
||||
private static String END_CHECK = "EN";
|
||||
private static String HEART_BEAT = "HB";
|
||||
|
||||
|
||||
|
||||
private static String END_STRING = "$";
|
||||
|
||||
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,15,30, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(20000));
|
||||
|
||||
|
||||
private PlcService plcService;
|
||||
|
||||
public Decoder(PlcService plcService) {
|
||||
|
||||
super(20000,true,false, Unpooled.copiedBuffer(">".getBytes()),
|
||||
Unpooled.copiedBuffer("$".getBytes()));
|
||||
this.plcService = plcService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
|
||||
|
||||
in = (ByteBuf) super.decode(ctx, in);
|
||||
if(in == null){
|
||||
log.debug("no data");
|
||||
return null;
|
||||
}
|
||||
ClientRunnable clientRunnable = new ClientRunnable(in,ctx,plcService);
|
||||
threadPoolExecutor.execute(clientRunnable);
|
||||
return null;
|
||||
}
|
||||
|
||||
public static class ClientRunnable implements Runnable {
|
||||
|
||||
private ByteBuf in;
|
||||
|
||||
private ChannelHandlerContext ctx;
|
||||
|
||||
private PlcService plcService;
|
||||
|
||||
public ClientRunnable(ByteBuf in,ChannelHandlerContext ctx,PlcService plcService){
|
||||
this.ctx = ctx;
|
||||
this.in = in;
|
||||
this.plcService = plcService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
String body = in.toString(Charset.forName("UTF-8"));
|
||||
tcpLogger.info("receive client:{}, data length:{}", body, body.length());
|
||||
TransmissionPojo transmissionPojo = new TransmissionPojo(body);
|
||||
//心跳进行连接
|
||||
if(HEART_BEAT.equals(transmissionPojo.getHeader())){
|
||||
ClientChanel.connect(transmissionPojo.getStreetNumber(),ctx.channel());
|
||||
}
|
||||
//获取照片后发送进行计算逻辑,并在拍照队列中取出拍照发送
|
||||
else if(GET_PHOTO_END.equals(transmissionPojo.getHeader())){
|
||||
if(transmissionPojo.getTaskId() != null && !"0".equals(transmissionPojo.getTaskId())) {
|
||||
ClientChanel.get(transmissionPojo.getStreetNumber()).writeAndFlush(transmissionPojo.toString(TransmissionType.RTS));
|
||||
}
|
||||
try {
|
||||
Thread.sleep(50L);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
//删除队列的拍照数据
|
||||
GetPhotoDelayExecutor.remove(transmissionPojo.getStreetNumber(),transmissionPojo.toString(TransmissionType.GPS));
|
||||
//发送给上位机
|
||||
CameraDelayTask cameraDelayTask = GetPhotoDelayExecutor.getNext(transmissionPojo.getStreetNumber());
|
||||
if (cameraDelayTask != null) {
|
||||
TransmissionPojo pojo = new TransmissionPojo(cameraDelayTask.getCommand());
|
||||
ClientChanel.get(pojo.getStreetNumber()).writeAndFlush(pojo.toString(TransmissionType.GPS));
|
||||
}
|
||||
|
||||
}
|
||||
//照片和结果保存,并发送给上位机
|
||||
else if(RETURN_CHECK.equals(transmissionPojo.getHeader())){
|
||||
//保存数据
|
||||
plcService.visualCalculationResults(transmissionPojo);
|
||||
//发送给上位机
|
||||
KsecInfo ksecInfo = plcService.getKsecDataInfo(transmissionPojo,"E");
|
||||
ksecInfo.getData().setTypeNum(transmissionPojo.getCategory());
|
||||
ksecInfo.getData().setQuantity(transmissionPojo.getCount());
|
||||
ksecInfo.getData().setCheckRlt(transmissionPojo.getResult());
|
||||
KsecNettyClient.write(ksecInfo);
|
||||
|
||||
|
||||
|
||||
}
|
||||
//照片和结果保存,并发送给上位机
|
||||
else if(FOLLOW_GET_PHOTO_END.equals(transmissionPojo.getHeader())){
|
||||
//保存数据
|
||||
plcService.saveFollowPhoto(transmissionPojo);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,4 @@
|
||||
package com.zhehekeji.web.service.client;
|
||||
|
||||
public class IntelliBlinkClient {
|
||||
}
|
||||
@ -0,0 +1,44 @@
|
||||
package com.zhehekeji.web.service.client;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
/**
|
||||
* 客户端的上下线
|
||||
*
|
||||
* @author Administrator
|
||||
*
|
||||
*/
|
||||
@Slf4j
|
||||
public class NettyConnectHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
/**
|
||||
* 建立连接时
|
||||
*/
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
String clientIp = socketAddress.getAddress().getHostAddress();
|
||||
int clientPort = socketAddress.getPort();
|
||||
log.info("ip:{} port:{} connected",clientIp, clientPort);
|
||||
ctx.fireChannelActive();
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭连接时
|
||||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
String clientIp = socketAddress.getAddress().getHostAddress();
|
||||
int clientPort = socketAddress.getPort();
|
||||
log.info("ip:{} port:{} disconnected",clientIp, clientPort);
|
||||
String ID = ClientChanel.getIDFromIp(clientIp);
|
||||
//设置客户端下线
|
||||
ClientChanel.disConnect(ID);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,14 @@
|
||||
package com.zhehekeji.web.service.client;
|
||||
|
||||
public enum TransmissionType {
|
||||
ST ,
|
||||
GPS ,
|
||||
GPE ,
|
||||
SGPS ,
|
||||
SGPE ,
|
||||
RTS ,
|
||||
RTE ,
|
||||
EN ,
|
||||
ZHB ;
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue