|
|
|
@ -1,64 +1,158 @@
|
|
|
|
package com.zhehekeji.web.lib;
|
|
|
|
package com.zhehekeji.web.lib;
|
|
|
|
|
|
|
|
|
|
|
|
import com.zhehekeji.common.util.SpringContextUtil;
|
|
|
|
import com.zhehekeji.common.util.SpringContextUtil;
|
|
|
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
|
|
|
|
|
import org.springframework.scheduling.annotation.Scheduled;
|
|
|
|
|
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import javax.annotation.PostConstruct;
|
|
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
import java.util.concurrent.DelayQueue;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
import java.util.concurrent.ScheduledExecutorService;
|
|
|
|
import java.util.concurrent.Executors;
|
|
|
|
import java.util.concurrent.ScheduledFuture;
|
|
|
|
|
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Slf4j
|
|
|
|
|
|
|
|
@Component
|
|
|
|
public class TaskDelayExecutor {
|
|
|
|
public class TaskDelayExecutor {
|
|
|
|
|
|
|
|
// 使用Spring管理的线程池(ThreadPoolTaskExecutor)
|
|
|
|
|
|
|
|
@Autowired
|
|
|
|
|
|
|
|
@Qualifier("scheduledExecutorService")
|
|
|
|
|
|
|
|
private ScheduledExecutorService scheduledExecutorService;
|
|
|
|
|
|
|
|
|
|
|
|
private static ExecutorService exec = Executors.newFixedThreadPool(1);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static DelayQueue<CameraDelayTask> queue = new DelayQueue<>();
|
|
|
|
@Resource
|
|
|
|
|
|
|
|
CameraControlModule cameraControlModule;
|
|
|
|
|
|
|
|
|
|
|
|
public static void addMp4DelayTask(Integer cameraId, String path, LocalDateTime startTime, LocalDateTime endTime, Long delayTime) {
|
|
|
|
// 使用ConcurrentHashMap存储任务,替代DelayQueue
|
|
|
|
CameraDelayTask cameraDelayTask = new CameraDelayTask(cameraId, startTime, endTime,path, 0,delayTime);
|
|
|
|
private static final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledTasks = new ConcurrentHashMap<>();
|
|
|
|
queue.add(cameraDelayTask);
|
|
|
|
|
|
|
|
}
|
|
|
|
// 任务ID生成器
|
|
|
|
|
|
|
|
private static final AtomicLong taskIdGenerator = new AtomicLong(0);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void addMp4DelayTask(Integer cameraId, String path, LocalDateTime startTime, LocalDateTime endTime, Long delayTime) {
|
|
|
|
|
|
|
|
String taskId = "mp4_" + cameraId + "_" + taskIdGenerator.incrementAndGet();
|
|
|
|
|
|
|
|
log.info("添加延时录像任务: taskId={}, cameraId={}, path={}, delayTime={}ms", taskId, cameraId, path, delayTime);
|
|
|
|
|
|
|
|
|
|
|
|
public static void addPicDelayTask(Integer cameraId, String path, Long delayTime) {
|
|
|
|
ScheduledFuture<?> scheduledFuture = getExecutorService().schedule(() -> {
|
|
|
|
CameraDelayTask cameraDelayTask = new CameraDelayTask(cameraId, null, null,path, 1,delayTime);
|
|
|
|
try {
|
|
|
|
queue.add(cameraDelayTask);
|
|
|
|
log.info("开始执行延时录像任务: taskId={}, cameraId={}, path={}", taskId, cameraId, path);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cameraControlModule.downloadMp4(cameraId, path, startTime, endTime);
|
|
|
|
|
|
|
|
// 任务执行完成后从map中移除
|
|
|
|
|
|
|
|
scheduledTasks.remove(taskId);
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
|
|
log.error("执行延时录像任务时发生异常: taskId={}, cameraId={}, path={}, error={}", taskId, cameraId, path, e.getMessage(), e);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}, delayTime, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scheduledTasks.put(taskId, scheduledFuture);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public static void addGyrateCameraTask(Integer cameraId, Long delayTime,Integer ptzId){
|
|
|
|
public void addPicDelayTask(Integer cameraId, String path, Long delayTime) {
|
|
|
|
CameraDelayTask cameraDelayTask = new CameraDelayTask(cameraId, null, null,null, 2,delayTime);
|
|
|
|
String taskId = "pic_" + cameraId + "_" + taskIdGenerator.incrementAndGet();
|
|
|
|
cameraDelayTask.setPtzId(ptzId);
|
|
|
|
log.info("添加延时拍照任务: taskId={}, cameraId={}, path={}, delayTime={}ms", taskId, cameraId, path, delayTime);
|
|
|
|
queue.add(cameraDelayTask);
|
|
|
|
|
|
|
|
|
|
|
|
ScheduledFuture<?> scheduledFuture = getExecutorService().schedule(() -> {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
log.info("开始执行延时拍照任务: taskId={}, cameraId={}, path={}", taskId, cameraId, path);
|
|
|
|
|
|
|
|
cameraControlModule.pic(cameraId, 0, path);
|
|
|
|
|
|
|
|
// 任务执行完成后从map中移除
|
|
|
|
|
|
|
|
scheduledTasks.remove(taskId);
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
|
|
log.error("执行延时拍照任务时发生异常: taskId={}, cameraId={}, path={}, error={}", taskId, cameraId, path, e.getMessage(), e);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}, delayTime, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scheduledTasks.put(taskId, scheduledFuture);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public static void runMp4DownloadExecutor(){
|
|
|
|
public void addGyrateCameraTask(Integer cameraId, Long delayTime, Integer ptzId) {
|
|
|
|
exec.execute(new Consumer());
|
|
|
|
String taskId = "gyrate_" + cameraId + "_" + ptzId + "_" + taskIdGenerator.incrementAndGet();
|
|
|
|
|
|
|
|
log.info("添加转至预置点任务: taskId={}, cameraId={}, ptzId={}, delayTime={}ms", taskId, cameraId, ptzId, delayTime);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ScheduledFuture<?> scheduledFuture = getExecutorService().schedule(() -> {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
log.info("开始执行转至预置点任务: taskId={}, cameraId={}, ptzId={}", taskId, cameraId, ptzId);
|
|
|
|
|
|
|
|
cameraControlModule.toPtz(ptzId, cameraId);
|
|
|
|
|
|
|
|
// 任务执行完成后从map中移除
|
|
|
|
|
|
|
|
scheduledTasks.remove(taskId);
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
|
|
log.error("执行转至预置点任务时发生异常: taskId={}, cameraId={}, ptzId={}, error={}", taskId, cameraId, ptzId, e.getMessage(), e);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}, delayTime, TimeUnit.MILLISECONDS);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scheduledTasks.put(taskId, scheduledFuture);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private static class Consumer implements Runnable {
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 取消指定任务
|
|
|
|
@Override
|
|
|
|
* @param taskId 任务ID
|
|
|
|
public void run() {
|
|
|
|
* @return 是否成功取消
|
|
|
|
while (true) {
|
|
|
|
*/
|
|
|
|
try {
|
|
|
|
public static boolean cancelTask(String taskId) {
|
|
|
|
CameraControlModule cameraControlModule = SpringContextUtil.getBean(CameraControlModule.class);
|
|
|
|
ScheduledFuture<?> scheduledFuture = scheduledTasks.get(taskId);
|
|
|
|
CameraDelayTask cameraDelayTask = queue.take();
|
|
|
|
if (scheduledFuture != null) {
|
|
|
|
if(cameraDelayTask != null){
|
|
|
|
boolean cancelled = scheduledFuture.cancel(false);
|
|
|
|
|
|
|
|
if (cancelled) {
|
|
|
|
if(cameraDelayTask.getType() == 0){
|
|
|
|
scheduledTasks.remove(taskId);
|
|
|
|
|
|
|
|
log.info("成功取消任务: {}", taskId);
|
|
|
|
cameraControlModule.downloadMp4(cameraDelayTask.getCameraId(), cameraDelayTask.getPath(), cameraDelayTask.getStartTime(), cameraDelayTask.getEndTime());
|
|
|
|
} else {
|
|
|
|
}else if(cameraDelayTask.getType() == 1){
|
|
|
|
log.warn("取消任务失败: {}", taskId);
|
|
|
|
cameraControlModule.pic(cameraDelayTask.getCameraId(),0, cameraDelayTask.getPath());
|
|
|
|
|
|
|
|
}else if(cameraDelayTask.getType() == 2){
|
|
|
|
|
|
|
|
cameraControlModule.toPtz(cameraDelayTask.getPtzId(),cameraDelayTask.getCameraId());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return cancelled;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
log.warn("未找到任务: {}", taskId);
|
|
|
|
|
|
|
|
return false;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 获取当前等待执行的任务数量
|
|
|
|
|
|
|
|
* @return 任务数量
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
public static int getPendingTaskCount() {
|
|
|
|
|
|
|
|
return scheduledTasks.size();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 清除所有任务
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
public static void clearAllTasks() {
|
|
|
|
|
|
|
|
scheduledTasks.values().forEach(future -> future.cancel(false));
|
|
|
|
|
|
|
|
scheduledTasks.clear();
|
|
|
|
|
|
|
|
log.info("已清除所有延时任务");
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 获取线程池实例
|
|
|
|
|
|
|
|
* @return 线程池实例
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
private ScheduledExecutorService getExecutorService() {
|
|
|
|
|
|
|
|
TaskDelayExecutor bean = SpringContextUtil.getBean(TaskDelayExecutor.class);
|
|
|
|
|
|
|
|
if (bean != null && bean.scheduledExecutorService != null) {
|
|
|
|
|
|
|
|
return bean.scheduledExecutorService;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 备用方案:创建独立的调度线程池
|
|
|
|
|
|
|
|
log.warn("无法获取Spring管理的线程池,使用备用线程池");
|
|
|
|
|
|
|
|
return new ScheduledThreadPoolExecutor(5, r -> {
|
|
|
|
|
|
|
|
Thread t = new Thread(r, "TaskDelayExecutor-Backup-" + taskIdGenerator.incrementAndGet());
|
|
|
|
|
|
|
|
t.setDaemon(false);
|
|
|
|
|
|
|
|
return t;
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* 定期输出任务统计信息
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
@Scheduled(fixedRate = 60000) // 每分钟执行一次
|
|
|
|
|
|
|
|
public void logTaskStatistics() {
|
|
|
|
|
|
|
|
log.info("当前等待执行的延时任务数量: {}", scheduledTasks.size());
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|