@ -1,8 +1,11 @@
package com.zhehekeji.web.service.client ;
import com.zhehekeji.web.entity.Stock ;
import com.zhehekeji.web.lib.CameraDelayTask ;
import com.zhehekeji.web.service.EmptyCheckService ;
import com.zhehekeji.web.service.PlcService ;
import com.zhehekeji.web.service.ksec.* ;
import com.zhehekeji.web.service.ksec.KsecInfo ;
import com.zhehekeji.web.service.ksec.KsecNettyClient ;
import com.zhehekeji.web.service.putian.* ;
import io.netty.buffer.ByteBuf ;
import io.netty.buffer.Unpooled ;
import io.netty.channel.ChannelHandlerContext ;
@ -12,15 +15,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory ;
import java.nio.charset.Charset ;
import java.time.LocalDateTime ;
import java.util.List ;
import java.util.concurrent.ArrayBlockingQueue ;
import java.util.concurrent.ThreadPoolExecutor ;
import java.util.concurrent.TimeUnit ;
import static com.zhehekeji.web.service.ksec.KsecDecoder.ksecInfoMap ;
import static org.aspectj.weaver.tools.cache.SimpleCacheFactory.path ;
/ * *
* 客 户 端 解 码 器 连 接 用
* /
@ -29,20 +27,32 @@ public class Decoder extends DelimiterBasedFrameDecoder {
private static final Logger tcpLogger = LoggerFactory . getLogger ( "tcp" ) ;
public static String PT_CLIENT = "PT" ;
private static String WCS_CLIENT = "WCS" ;
private static String EMPTY_CLIENT = "EMPTY" ;
public static String START_CHECK = "ST" ;
private static String GET_PHOTO = "GP" ;
private static String GET_PHOTO_END = "GPE" ;
private static String RETURN_CHECK = "RTE" ;
private static String FOLLOW_GET_PHOTO_END = "SGPE" ;
private static String END_CHECK = "EN" ;
private static String HEART_BEAT = "HB" ;
private static String CONNECT_START = "CE" ;
private static String END_STRING = "$" ;
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor ( 5 , 15 , 30 , TimeUnit . MILLISECONDS , new ArrayBlockingQueue < > ( 20000 ) ) ;
private EmptyCheckService emptyCheckService ;
private PlcService plcService ;
public Decoder ( PlcService plcService ) {
public Decoder ( PlcService plcService ,EmptyCheckService emptyCheckService ) {
super ( 20000 , true , false , Unpooled . copiedBuffer ( ">" . getBytes ( ) ) ,
Unpooled . copiedBuffer ( "$" . getBytes ( ) ) ) ;
this . plcService = plcService ;
this . emptyCheckService = emptyCheckService ;
}
@Override
@ -53,7 +63,7 @@ public class Decoder extends DelimiterBasedFrameDecoder {
log . debug ( "no data" ) ;
return null ;
}
ClientRunnable clientRunnable = new ClientRunnable ( in , ctx , plcService );
ClientRunnable clientRunnable = new ClientRunnable ( in , ctx , plcService ,emptyCheckService );
threadPoolExecutor . execute ( clientRunnable ) ;
return null ;
}
@ -66,80 +76,66 @@ public class Decoder extends DelimiterBasedFrameDecoder {
private PlcService plcService ;
private EmptyCheckService emptyCheckService ;
public ClientRunnable ( ByteBuf in , ChannelHandlerContext ctx , PlcService plcService ){
public ClientRunnable ( ByteBuf in , ChannelHandlerContext ctx , PlcService plcService ,EmptyCheckService emptyCheckService ){
this . ctx = ctx ;
this . in = in ;
this . plcService = plcService ;
this . emptyCheckService = emptyCheckService ;
}
@Override
public void run ( ) {
String body = in . toString ( Charset . forName ( "UTF-8" ) ) ;
tcpLogger . info ( "receive client:{}, data length:{}" , body , body . length ( ) ) ;
//视觉服务
{
if ( body . startsWith ( HBTransmission . getHEADER ( ) ) ) {
//心跳
HBTransmission hbTransmission = new HBTransmission ( body ) ;
//回复客户端心跳
ctx . channel ( ) . writeAndFlush ( hbTransmission . toString ( ) ) ;
ClientChanel . connect ( hbTransmission . getSRMNumber ( ) , ctx . channel ( ) ) ;
//tcpLogger.info("client:{} heart", hbTransmission.getSRMNumber());
in . release ( ) ;
} else if ( body . startsWith ( SCTransmission . getHeader ( ) ) ) {
//盘点指令
SCTransmission scTransmission = new SCTransmission ( body ) ;
if ( scTransmission . isCollectOver ( ) ) {
//给普天发送数据采集完毕指令
KsecDataInfo ksecDataInfo = ksecInfoMap . get ( scTransmission . getTaskNo ( ) ) ;
ksecDataInfo . setCmdName ( "E2" ) ;
KsecInfo ksecInfo = new KsecInfo ( "KC" , "E" , ksecDataInfo ) ;
KsecNettyClient . write ( ksecInfo ) ;
} else {
plcService . visualInventory ( scTransmission ) ;
KsecDataInfo ksecDataInfo = ksecInfoMap . get ( scTransmission . getTaskNo ( ) ) ;
ksecDataInfo . setCmdName ( "E3" ) ;
ksecDataInfo . setQuantity ( scTransmission . getRstCount ( ) ) ;
ksecDataInfo . setTypeNum ( scTransmission . getRstCategory ( ) ) ;
ksecDataInfo . setCheckRlt ( Integer . parseInt ( scTransmission . getCheckRst ( ) ) ) ;
KsecInfo ksecInfo = new KsecInfo ( "KC" , "E" , ksecDataInfo ) ;
KsecNettyClient . write ( ksecInfo ) ;
ksecInfoMap . remove ( scTransmission . getTaskNo ( ) ) ;
//添加到实时信息里
//RealtimeCheckMap.put(scTransmission.getSRMNumber(),scTransmission.checkInfo());
//更新盤點統計
// emptyCheckService.updateCheckLastTime(tmTransmission.getTaskNo(),tmTransmission.getSRMNumber(),stock.getCode());
}
in . release ( ) ;
} else if ( body . startsWith ( CETransmission . getHEADER ( ) ) ) {
//客户端建立连接
CETransmission ceTransmission = new CETransmission ( body ) ;
//回复客户端,建立连接完成
ctx . channel ( ) . writeAndFlush ( ceTransmission . toString ( ) ) ;
ClientChanel . connect ( ceTransmission . getSRMNumber ( ) , ctx . channel ( ) ) ;
tcpLogger . info ( "client:{} connect" , ceTransmission . getSRMNumber ( ) ) ;
in . release ( ) ;
tcpLogger . info ( "receive client:{}, data length:{}" , body , body . length ( ) ) ;
TransmissionPojo transmissionPojo = new TransmissionPojo ( body ) ;
//心跳进行连接
if ( HEART_BEAT . equals ( transmissionPojo . getHeader ( ) ) | | CONNECT_START . equals ( transmissionPojo . getHeader ( ) ) ) {
ClientChanel . connect ( transmissionPojo . getStreetNumber ( ) , ctx . channel ( ) ) ;
}
//获取照片后发送进行计算逻辑,并在拍照队列中取出拍照发送
else if ( GET_PHOTO_END . equals ( transmissionPojo . getHeader ( ) ) ) {
if ( transmissionPojo . getTaskId ( ) ! = null & & ! "0" . equals ( transmissionPojo . getTaskId ( ) ) ) {
ClientChanel . get ( transmissionPojo . getStreetNumber ( ) ) . writeAndFlush ( transmissionPojo . toString ( TransmissionType . RTS ) ) ;
// GetPhotoDelayExecutor.addCameraDelayTask(transmissionPojo.getStreetNumber(), transmissionPojo.toString(TransmissionType.RTS), 3000);
}
else if ( body . startsWith ( "DC" ) ) {
//客户端断开连接
String [ ] strings = body . split ( "&" ) ;
if ( strings ! = null & & strings . length = = 2 ) {
tcpLogger . info ( "client:{} disConnect" , strings [ 1 ] ) ;
ClientChanel . disConnect ( strings [ 1 ] ) ;
}
in . release ( ) ;
try {
Thread . sleep ( 50L ) ;
} catch ( InterruptedException e ) {
throw new RuntimeException ( e ) ;
}
//删除队列的拍照数据
GetPhotoDelayExecutor . remove ( transmissionPojo . getStreetNumber ( ) , transmissionPojo ) ;
//读取下一个发送
CameraDelayTask cameraDelayTask = GetPhotoDelayExecutor . nextOne ( transmissionPojo . getStreetNumber ( ) , transmissionPojo . toString ( TransmissionType . GPS ) ) ;
if ( cameraDelayTask ! = null ) {
TransmissionPojo pojo = new TransmissionPojo ( cameraDelayTask . getCommand ( ) ) ;
ClientChanel . get ( pojo . getStreetNumber ( ) ) . writeAndFlush ( pojo . toString ( TransmissionType . GPS ) ) ;
}
// else if (body.contains("EMPTY_CLIENT")){
// ClientChanel.connect(EMPTY_CLIENT_NAME, ctx.channel());
// tcpLogger.info("client:{} connect", EMPTY_CLIENT_NAME);
// in.release();
// }
}
//照片和结果保存,并发送给上位机
else if ( RETURN_CHECK . equals ( transmissionPojo . getHeader ( ) ) ) {
//保存数据
plcService . visualCalculationResults ( transmissionPojo ) ;
//发送给上位机
KsecInfo ksecInfo = plcService . getKsecDataInfo ( transmissionPojo , "E" ) ;
ksecInfo . getData ( ) . setTypeNum ( transmissionPojo . getCategory ( ) ) ;
ksecInfo . getData ( ) . setQuantity ( transmissionPojo . getCount ( ) ) ;
ksecInfo . getData ( ) . setCheckRlt ( transmissionPojo . getResult ( ) ) ;
KsecNettyClient . write ( ksecInfo ) ;
GetPhotoDelayExecutor . removeTask ( transmissionPojo . getStreetNumber ( ) , transmissionPojo ) ;
}
//照片和结果保存,并发送给上位机
else if ( FOLLOW_GET_PHOTO_END . equals ( transmissionPojo . getHeader ( ) ) ) {
//保存数据
plcService . saveFollowPhoto ( transmissionPojo ) ;
}
}
}
}