初始化仓库
This commit is contained in:
40
xtools-boot-mq/xtools-boot-mq-base/pom.xml
Normal file
40
xtools-boot-mq/xtools-boot-mq-base/pom.xml
Normal file
@@ -0,0 +1,40 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.xujun</groupId>
|
||||
<artifactId>xtools-boot-mq</artifactId>
|
||||
<version>5.0.0</version>
|
||||
</parent>
|
||||
<artifactId>xtools-boot-mq-base</artifactId>
|
||||
|
||||
<!-- 依赖 -->
|
||||
<dependencies>
|
||||
<!-- xtools-boot begin -->
|
||||
<!-- xtools-boot-core -->
|
||||
<dependency>
|
||||
<groupId>org.xujun</groupId>
|
||||
<artifactId>xtools-boot-core</artifactId>
|
||||
</dependency>
|
||||
<!-- xtools-boot-log -->
|
||||
<dependency>
|
||||
<groupId>org.xujun</groupId>
|
||||
<artifactId>xtools-boot-log</artifactId>
|
||||
</dependency>
|
||||
<!-- xtools-boot-thread -->
|
||||
<dependency>
|
||||
<groupId>org.xujun</groupId>
|
||||
<artifactId>xtools-boot-thread</artifactId>
|
||||
</dependency>
|
||||
<!-- xtools-boot end -->
|
||||
|
||||
<!-- fastjson2 -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.fastjson2</groupId>
|
||||
<artifactId>fastjson2</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,28 @@
|
||||
package xtools.boot.mq.base;
|
||||
|
||||
import org.springframework.context.annotation.Import;
|
||||
import xtools.boot.core.utils.ModuleLoadUtils;
|
||||
import xtools.boot.mq.base.selector.BootMqBaseImportSelector;
|
||||
|
||||
/**
|
||||
* <p>Title : BootMqBaseConfiguration</p>
|
||||
* <p>Description : BootMqBaseConfiguration</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/01/01 09:30
|
||||
*/
|
||||
@Import(BootMqBaseImportSelector.class)
|
||||
public class BootMqBaseConfiguration {
|
||||
|
||||
/**
|
||||
* 构造方法
|
||||
*/
|
||||
public BootMqBaseConfiguration() {
|
||||
ModuleLoadUtils.loadSuccess(BootMqBaseConfiguration.class);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package xtools.boot.mq.base.enums;
|
||||
|
||||
/**
|
||||
* <p>Title : MqEnums</p>
|
||||
* <p>Description : MqEnums</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/2/5 16:46
|
||||
*/
|
||||
public interface MqEnums {
|
||||
|
||||
/**
|
||||
* 获取队列名称
|
||||
*
|
||||
* @return 队列名称
|
||||
*/
|
||||
String queue();
|
||||
|
||||
/**
|
||||
* 是否保存日志
|
||||
*
|
||||
* @return 是否保存日志
|
||||
*/
|
||||
boolean saveLog();
|
||||
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package xtools.boot.mq.base.handle;
|
||||
|
||||
/**
|
||||
* <p>Title : BaseErrorHandle</p>
|
||||
* <p>Description : BaseErrorHandle</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/2/12 10:46
|
||||
*/
|
||||
public interface BaseErrorHandle {
|
||||
|
||||
/**
|
||||
* 消息处理错误异常(需要消息队列重推消息,抛出异常即可)
|
||||
*
|
||||
* @param message 消息内容
|
||||
* @param exception 异常信息
|
||||
*/
|
||||
void messageHandle(String message, Exception exception);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,133 @@
|
||||
package xtools.boot.mq.base.handle;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import xtools.boot.api.model.dto.log.LogTrack;
|
||||
import xtools.boot.core.utils.SpringContextUtils;
|
||||
import xtools.boot.log.LogBus;
|
||||
import xtools.boot.log.enums.LogBusBaseType;
|
||||
import xtools.boot.log.holder.LogTrackHolder;
|
||||
import xtools.boot.mq.base.enums.MqEnums;
|
||||
import xtools.boot.mq.base.model.dto.MessageDto;
|
||||
import xtools.boot.mq.base.utils.MqMessageUtils;
|
||||
import xtools.core.enums.LogLevel;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* <p>Title : BaseMessageHandle</p>
|
||||
* <p>Description : BaseMessageHandle</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/2/11 17:03
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class BaseMessageHandle<T> {
|
||||
|
||||
/**
|
||||
* 消息泛型类
|
||||
*/
|
||||
private final Class<T> clazz;
|
||||
|
||||
/**
|
||||
* 构造方法
|
||||
*
|
||||
* @param clazz 泛型类
|
||||
*/
|
||||
public BaseMessageHandle(Class<T> clazz) {
|
||||
this.clazz = clazz;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取队列
|
||||
*
|
||||
* @return 队列
|
||||
*/
|
||||
public abstract MqEnums queue();
|
||||
|
||||
/**
|
||||
* 获取监听方法
|
||||
*
|
||||
* @return 监听方法
|
||||
*/
|
||||
public String listenerMethod() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取参数
|
||||
*
|
||||
* @return 参数
|
||||
*/
|
||||
public Object params() {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 自定义处理消息
|
||||
*
|
||||
* @param message 消息内容
|
||||
* @throws Exception 异常信息
|
||||
*/
|
||||
public abstract void handleMessage(T message) throws Exception;
|
||||
|
||||
/**
|
||||
* 基础消息处理
|
||||
*
|
||||
* @param message 消息内容
|
||||
*/
|
||||
public void baseHandleMessage(String message) {
|
||||
// 解包消息
|
||||
MessageDto<T> msg = MqMessageUtils.from(message, clazz);
|
||||
final T data = msg.getData();
|
||||
if (Objects.isNull(data)) {
|
||||
log.warn("消息内容为空");
|
||||
return;
|
||||
}
|
||||
AtomicReference<Exception> err = new AtomicReference<>();
|
||||
LogTrack logTrack = msg.getLog();
|
||||
if (Objects.isNull(logTrack)) {
|
||||
try {
|
||||
handleMessage(data);
|
||||
} catch (Exception e) {
|
||||
err.set(e);
|
||||
}
|
||||
} else {
|
||||
ScopedValue.where(LogTrackHolder.getScoped(), LogTrackHolder.newHolderLogTrack(logTrack)).run(() -> {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
if (logTrack.save()) {
|
||||
LogBus.init(LogBusBaseType.MQ).title("MQ接收").data(msg).save();
|
||||
}
|
||||
handleMessage(data);
|
||||
} catch (Exception e) {
|
||||
err.set(e);
|
||||
} finally {
|
||||
if (logTrack.save()) {
|
||||
long endTime = System.currentTimeMillis();
|
||||
LogLevel level = Objects.isNull(err.get()) ? LogLevel.INFO : LogLevel.ERROR;
|
||||
LogBus.init(level, LogBusBaseType.MQ)
|
||||
.title("MQ处理完成")
|
||||
.error(err.get())
|
||||
.data(JSONObject.of("execTime", endTime - startTime))
|
||||
.save();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Exception exception = err.get();
|
||||
if (Objects.isNull(exception)) {
|
||||
return;
|
||||
}
|
||||
log.error("消息处理异常", exception);
|
||||
BaseErrorHandle errorHandle = SpringContextUtils.getBeanDefNull(BaseErrorHandle.class);
|
||||
if (Objects.nonNull(errorHandle)) {
|
||||
errorHandle.messageHandle(message, exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package xtools.boot.mq.base.handle;
|
||||
|
||||
/**
|
||||
* <p>Title : BaseMqHandle</p>
|
||||
* <p>Description : BaseMqHandle</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/2/12 09:23
|
||||
*/
|
||||
public interface BaseMqHandle {
|
||||
|
||||
/**
|
||||
* 初始化消息队列
|
||||
*
|
||||
* @param queueName 消息队列名称
|
||||
*/
|
||||
void initQueue(String queueName);
|
||||
|
||||
/**
|
||||
* 添加消息队列监听
|
||||
*
|
||||
* @param queueName 消息队列
|
||||
* @param delegate 监听类
|
||||
* @param listenerMethod 监听方法
|
||||
* @param params 参数
|
||||
*/
|
||||
void addListen(String queueName, Object delegate, String listenerMethod, Object params);
|
||||
|
||||
/**
|
||||
* 推送消息
|
||||
*
|
||||
* @param routingKey 路由 key
|
||||
* @param data 消息数据
|
||||
*/
|
||||
void push(String routingKey, Object data);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,80 @@
|
||||
package xtools.boot.mq.base.init;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jspecify.annotations.NonNull;
|
||||
import org.springframework.boot.ApplicationArguments;
|
||||
import org.springframework.boot.ApplicationRunner;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import xtools.base.config.BaseParams;
|
||||
import xtools.base.exception.CommonException;
|
||||
import xtools.boot.api.enums.BootError;
|
||||
import xtools.boot.core.utils.SpringContextUtils;
|
||||
import xtools.boot.log.LogBus;
|
||||
import xtools.boot.log.enums.LogBusBaseType;
|
||||
import xtools.boot.log.holder.LogTrackHolder;
|
||||
import xtools.boot.mq.base.handle.BaseMessageHandle;
|
||||
import xtools.boot.mq.base.handle.BaseMqHandle;
|
||||
import xtools.core.CollectionUtils;
|
||||
import xtools.core.enums.LogLevel;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
/**
|
||||
* <p>Title : InitMq</p>
|
||||
* <p>Description : InitMq</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/2/2 14:13
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@Order(BaseParams.CP_NUM50)
|
||||
public class InitMq implements ApplicationRunner {
|
||||
|
||||
@Override
|
||||
public void run(@NonNull ApplicationArguments args) {
|
||||
ScopedValue.where(LogTrackHolder.getScoped(), LogTrackHolder.newMain()).run(() -> {
|
||||
try {
|
||||
init();
|
||||
} catch (Exception e) {
|
||||
LogBus.init(LogLevel.ERROR, LogBusBaseType.MQ).title("初始化异常").error(e).save();
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 初始化
|
||||
*/
|
||||
@SuppressWarnings("rawtypes")
|
||||
private void init() {
|
||||
BaseMqHandle mqHandle = SpringContextUtils.getBeanDefNull(BaseMqHandle.class);
|
||||
if (Objects.isNull(mqHandle)) {
|
||||
throw CommonException.create(BootError.MQ, "没有找到MQ处理适配器,请添加对应Jar包");
|
||||
}
|
||||
|
||||
Collection<BaseMessageHandle> beanList = SpringContextUtils.getBeanList(BaseMessageHandle.class);
|
||||
if (CollectionUtils.isEmpty(beanList)) {
|
||||
log.info("没有找到MQ的消息处理适配器");
|
||||
return;
|
||||
}
|
||||
beanList.forEach(item -> {
|
||||
// 获取队列名称
|
||||
String queueName = item.queue().queue();
|
||||
try {
|
||||
mqHandle.initQueue(queueName);
|
||||
mqHandle.addListen(queueName, item, item.listenerMethod(), item.params());
|
||||
log.info("消息队列监听[{}]初始化成功", queueName);
|
||||
} catch (Exception e) {
|
||||
LogBus.init(LogLevel.ERROR, LogBusBaseType.MQ).title("初始化" + queueName + "异常").error(e).save();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package xtools.boot.mq.base.model.dto;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import xtools.boot.api.model.dto.log.LogTrack;
|
||||
|
||||
/**
|
||||
* <p>Title : MessageDto</p>
|
||||
* <p>Description : MessageDto</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/2/11 20:06
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
public class MessageDto<T> {
|
||||
|
||||
/**
|
||||
* 日志追踪信息
|
||||
*/
|
||||
private LogTrack log;
|
||||
|
||||
/**
|
||||
* 消息内容
|
||||
*/
|
||||
private T data;
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package xtools.boot.mq.base.selector;
|
||||
|
||||
import org.jspecify.annotations.NonNull;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
|
||||
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
|
||||
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
|
||||
import org.springframework.core.type.AnnotationMetadata;
|
||||
|
||||
/**
|
||||
* <p>Title : BootMqBaseImportSelector</p>
|
||||
* <p>Description : BootMqBaseImportSelector</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/01/01 09:30
|
||||
*/
|
||||
public class BootMqBaseImportSelector implements ImportBeanDefinitionRegistrar {
|
||||
|
||||
/**
|
||||
* 根据给定的注释元数据,根据需要注册bean
|
||||
*
|
||||
* @param importingClassMetadata AnnotationMetadata
|
||||
* @param registry BeanDefinitionRegistry
|
||||
*/
|
||||
@Override
|
||||
public void registerBeanDefinitions(@NonNull AnnotationMetadata importingClassMetadata, @NonNull BeanDefinitionRegistry registry) {
|
||||
// 构建扫描对象
|
||||
ClassPathBeanDefinitionScanner scanner = new ClassPathBeanDefinitionScanner(registry, true);
|
||||
// 扫描包下路径
|
||||
scanner.scan("xtools.boot.mq.base");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
package xtools.boot.mq.base.utils;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import xtools.boot.api.exection.BizError;
|
||||
import xtools.boot.api.model.dto.log.LogTrack;
|
||||
import xtools.boot.core.utils.SpringContextUtils;
|
||||
import xtools.boot.log.LogBus;
|
||||
import xtools.boot.log.enums.LogBusBaseType;
|
||||
import xtools.boot.log.holder.LogTrackHolder;
|
||||
import xtools.boot.mq.base.enums.MqEnums;
|
||||
import xtools.boot.mq.base.handle.BaseMqHandle;
|
||||
import xtools.boot.thread.utils.VirtualThreadTaskUtils;
|
||||
import xtools.core.enums.LogLevel;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* <p>Title : MqBus</p>
|
||||
* <p>Description : MqBus</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/2/12 10:00
|
||||
*/
|
||||
public class MqBus {
|
||||
|
||||
/**
|
||||
* 推送到消息队列
|
||||
*
|
||||
* @param mq 队列
|
||||
* @param data 数据
|
||||
*/
|
||||
public static void push(MqEnums mq, Object data) {
|
||||
BaseMqHandle mqHandle = SpringContextUtils.getBeanDefNull(BaseMqHandle.class);
|
||||
if (Objects.isNull(mqHandle)) {
|
||||
throw new BizError("没有找到MQ处理适配器,请添加对应Jar包");
|
||||
}
|
||||
// 是否保存日志
|
||||
boolean saveLog = mq.saveLog();
|
||||
LogTrack logTrack;
|
||||
if (saveLog) {
|
||||
logTrack = LogTrackHolder.getDefNull();
|
||||
if (Objects.isNull(logTrack)) {
|
||||
saveLog = false;
|
||||
} else {
|
||||
saveLog = logTrack.save();
|
||||
}
|
||||
}
|
||||
if (saveLog) {
|
||||
VirtualThreadTaskUtils.execute(() -> {
|
||||
LogTrack track = LogTrackHolder.get();
|
||||
JSONObject message = MqMessageUtils.to(data, track);
|
||||
LogBus.init(LogLevel.INFO, LogBusBaseType.MQ, track).title("MQ推送").data(message).save();
|
||||
mqHandle.push(mq.queue(), message.toString());
|
||||
});
|
||||
} else {
|
||||
VirtualThreadTaskUtils.simple(() -> mqHandle.push(mq.queue(), MqMessageUtils.to(data, null).toString()));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
package xtools.boot.mq.base.utils;
|
||||
|
||||
import com.alibaba.fastjson2.JSONObject;
|
||||
import xtools.boot.api.model.dto.log.LogTrack;
|
||||
import xtools.boot.mq.base.model.dto.MessageDto;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
/**
|
||||
* <p>Title : MqMessageUtils</p>
|
||||
* <p>Description : MqMessageUtils</p>
|
||||
* <p>DevelopTools : Idea_x64_v2026.1</p>
|
||||
* <p>DevelopSystem : macOS Sequoia 15.7.5</p>
|
||||
* <p>Company : org.xujun</p>
|
||||
*
|
||||
* @author : XuJun
|
||||
* @version : 5.0.0
|
||||
* @date : 2026/2/11 19:56
|
||||
*/
|
||||
public class MqMessageUtils implements Serializable {
|
||||
|
||||
|
||||
/**
|
||||
* 消息包数据 Key
|
||||
*/
|
||||
private static final String LOG = "log";
|
||||
private static final String DATA = "data";
|
||||
|
||||
/**
|
||||
* 构建消息包
|
||||
*
|
||||
* @param data 消息内容
|
||||
* @param logTrack 日志追踪信息
|
||||
* @return java.lang.String
|
||||
**/
|
||||
public static JSONObject to(Object data, LogTrack logTrack) {
|
||||
return JSONObject.of(LOG, logTrack, DATA, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析消息包
|
||||
*
|
||||
* @param message 消息包
|
||||
* @param clazz 消息内容类型
|
||||
* @return MessageDto<T>
|
||||
**/
|
||||
public static <T> MessageDto<T> from(String message, Class<T> clazz) {
|
||||
JSONObject msg = JSONObject.parseObject(message);
|
||||
LogTrack log = msg.getObject(LOG, LogTrack.class);
|
||||
T data = msg.getObject(DATA, clazz);
|
||||
return new MessageDto<>(log, data);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
xtools.boot.mq.base.BootMqBaseConfiguration
|
||||
Reference in New Issue
Block a user