|
@@ -0,0 +1,449 @@
|
|
|
+package com.xxh.cloud.framework.mq.kafka;
|
|
|
+
|
|
|
+import cn.hutool.core.collection.ListUtil;
|
|
|
+import cn.hutool.core.util.ArrayUtil;
|
|
|
+import com.alibaba.fastjson2.JSON;
|
|
|
+import lombok.extern.log4j.Log4j2;
|
|
|
+import org.apache.kafka.clients.admin.*;
|
|
|
+import org.apache.kafka.clients.producer.ProducerRecord;
|
|
|
+import org.apache.kafka.common.KafkaFuture;
|
|
|
+import org.apache.kafka.common.TopicPartitionInfo;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.kafka.core.KafkaTemplate;
|
|
|
+import org.springframework.kafka.support.SendResult;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.concurrent.ListenableFuture;
|
|
|
+import org.springframework.util.concurrent.ListenableFutureCallback;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * https://blog.csdn.net/cold___play/article/details/132398946?spm=1001.2101.3001.6650.2&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-2-132398946-blog-125011879.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EYuanLiJiHua%7EPosition-2-132398946-blog-125011879.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=4
|
|
|
+ *
|
|
|
+ * https://masiyi.blog.csdn.net/article/details/121856703?spm=1001.2101.3001.6650.16&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-16-121856703-blog-127277614.235%5Ev38%5Epc_relevant_anti_vip_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-16-121856703-blog-127277614.235%5Ev38%5Epc_relevant_anti_vip_base&utm_relevant_index=22
|
|
|
+ *
|
|
|
+ *
|
|
|
+ * */
|
|
|
+@Log4j2
|
|
|
+@Component
|
|
|
+public class KafkaProducerUtils {
|
|
|
+
|
|
|
+ private static final String PUSH_MSG_LOG = "准备发送消息为:{}";
|
|
|
+
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private KafkaTemplate<String,Object> kafkaTemplate;
|
|
|
+
|
|
|
+
|
|
|
+ @Value("${spring.kafka.bootstrap-servers}")
|
|
|
+ private String springKafkaBootstrapServers;
|
|
|
+
|
|
|
+ private AdminClient adminClient;
|
|
|
+
|
|
|
+
|
|
|
+ private String toJSONString(Object obj){
|
|
|
+ return JSON.toJSONString(obj);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 初始化AdminClient
|
|
|
+ * '@PostConstruct该注解被用来修饰一个非静态的void()方法。
|
|
|
+ * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
|
|
|
+ * PostConstruct在构造函数之后执行,init()方法之前执行。
|
|
|
+ */
|
|
|
+ @PostConstruct
|
|
|
+ private void initAdminClient() {
|
|
|
+ System.out.println(" +++++++ Kafka Init AdminClient");
|
|
|
+ log.debug(" +++++++ Kafka Init AdminClient");
|
|
|
+ Map<String, Object> props = new HashMap<>();
|
|
|
+ props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
|
|
|
+ adminClient = KafkaAdminClient.create(props);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取全部topic
|
|
|
+ */
|
|
|
+ public List<String> getAllTopic() {
|
|
|
+ try {
|
|
|
+ return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return ListUtil.empty();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 如果没有topic,则创建一个
|
|
|
+ * @param topicName :
|
|
|
+ * @param partitionNum : 分区
|
|
|
+ * @param replicaNum : 副本
|
|
|
+ * @return org.apache.kafka.clients.admin.CreateTopicsResult
|
|
|
+ */
|
|
|
+ public Boolean createTopic(String topicName, int partitionNum, int replicaNum){
|
|
|
+ KafkaFuture<Set<String>> topics = adminClient.listTopics().names();
|
|
|
+ try {
|
|
|
+ if (topics.get().contains(topicName)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ System.out.println("++++++++++++ 创建主题 ++++++++++++++");
|
|
|
+ NewTopic newTopic = new NewTopic(topicName, partitionNum, (short) replicaNum);
|
|
|
+ adminClient.createTopics(Collections.singleton(newTopic));
|
|
|
+ return true;
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 新增topic,支持批量
|
|
|
+ */
|
|
|
+ public void createTopic(Collection<NewTopic> newTopics) {
|
|
|
+ adminClient.createTopics(newTopics);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取指定topic的信息
|
|
|
+ */
|
|
|
+ public String getTopicInfo(String topic) {
|
|
|
+ Collection<String> topics = Collections.singleton(topic);
|
|
|
+ return getTopicInfo(topics);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取指定topic的信息
|
|
|
+ */
|
|
|
+ public String getTopicInfo(Collection<String> topics) {
|
|
|
+ AtomicReference<String> info = new AtomicReference<>("");
|
|
|
+ try {
|
|
|
+ adminClient.describeTopics(topics).all().get().forEach((topic, description) -> {
|
|
|
+ for (TopicPartitionInfo partition : description.partitions()) {
|
|
|
+ info.set(info + partition.toString() + "\n");
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (InterruptedException | ExecutionException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ return info.get();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * */
|
|
|
+ public Boolean deleteTopic(String topicName){
|
|
|
+ KafkaFuture<Set<String>> topics = adminClient.listTopics().names();
|
|
|
+ try {
|
|
|
+ if (!topics.get().contains(topicName)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ adminClient.deleteTopics(Collections.singleton(topicName));
|
|
|
+ return true;
|
|
|
+ }catch(Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,json格式字符串的消息,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param jsonStr : 消息json字符串
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public boolean sendMessage(String topicName, String jsonStr) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future= kafkaTemplate.send(new ProducerRecord<>(topicName,jsonStr));
|
|
|
+ return dealSendResult(future);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,json格式字符串数组的消息,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param jsonStrs : 消息json字符串数组
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public Boolean[] sendMessage(String topicName, String[] jsonStrs) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ int msgLength = jsonStrs.length;
|
|
|
+ Boolean[] success = new Boolean[msgLength];
|
|
|
+ for (int i = 0; i < msgLength; i++) {
|
|
|
+ String jsonStr = jsonStrs[i];
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ jsonStr));
|
|
|
+ success[i] = dealSendResult(future);
|
|
|
+ }
|
|
|
+ return success;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,消息对象,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param obj : 消息对象
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public boolean sendMessage(String topicName, Object obj) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ String jsonStr = toJSONString(obj);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ jsonStr));
|
|
|
+
|
|
|
+ return dealSendResult(future);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,消息对象数组,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param list : 消息对象数组
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public Boolean[] sendMessage(String topicName, List<Object> list) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ Boolean[] success = new Boolean[list.size()];
|
|
|
+ for (int i = 0; i < list.size(); i++) {
|
|
|
+ Object obj = list.get(i);
|
|
|
+ String jsonStr = toJSONString(obj);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ jsonStr));
|
|
|
+ success[i] = dealSendResult(future);
|
|
|
+ }
|
|
|
+ return success;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,json格式字符串的消息,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param key : 消息key
|
|
|
+ * @param jsonStr : 消息json字符串
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public boolean sendMessage(String topicName, String key, String jsonStr) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ key, jsonStr));
|
|
|
+
|
|
|
+ return dealSendResult(future);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,json格式字符串数组的消息,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param key : 消息key
|
|
|
+ * @param jsonStrs : 消息json字符串数组
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public Boolean[] sendMessage(String topicName, String key, String[] jsonStrs) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ int msgLength = jsonStrs.length;
|
|
|
+ Boolean[] success = new Boolean[msgLength];
|
|
|
+ for (int i = 0; i < msgLength; i++) {
|
|
|
+ String jsonStr = jsonStrs[i];
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ key, jsonStr));
|
|
|
+ success[i] = dealSendResult(future);
|
|
|
+ }
|
|
|
+ return success;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,消息对象,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param key : 消息key
|
|
|
+ * @param obj : 消息对象
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public boolean sendMessage(String topicName, String key, Object obj) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ String jsonStr = toJSONString(obj);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ key, jsonStr));
|
|
|
+
|
|
|
+ return dealSendResult(future);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,消息对象数组,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param key : 消息key
|
|
|
+ * @param list : 消息对象数组
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public Boolean[] sendMessage(String topicName, String key, List<Object> list) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ Boolean[] success = new Boolean[list.size()];
|
|
|
+ for (int i = 0; i < list.size(); i++) {
|
|
|
+ Object obj = list.get(i);
|
|
|
+ String jsonStr = toJSONString(obj);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ key, jsonStr));
|
|
|
+ success[i] = dealSendResult(future);
|
|
|
+ }
|
|
|
+ return success;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,json格式字符串的消息,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param partition : 消息发送分区
|
|
|
+ * @param key : 消息key
|
|
|
+ * @param jsonStr : 消息json字符串
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public boolean sendMessage(String topicName, int partition, String key, String jsonStr) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ partition, key, jsonStr));
|
|
|
+
|
|
|
+ return dealSendResult(future);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,json格式字符串数组的消息,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param partition : 消息发送分区
|
|
|
+ * @param key : 消息key
|
|
|
+ * @param jsonStrs : 消息json字符串数组
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public Boolean[] sendMessage(String topicName, int partition, String key, String[] jsonStrs) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ int msgLength = jsonStrs.length;
|
|
|
+ Boolean[] success = new Boolean[msgLength];
|
|
|
+ for (int i = 0; i < msgLength; i++) {
|
|
|
+ String jsonStr = jsonStrs[i];
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ partition, key, jsonStr));
|
|
|
+ success[i] = dealSendResult(future);
|
|
|
+ }
|
|
|
+ return success;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,消息对象,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param partition : 消息发送分区
|
|
|
+ * @param key : 消息key
|
|
|
+ * @param obj : 消息对象
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public boolean sendMessage(String topicName, int partition, String key, Object obj) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ String jsonStr = toJSONString(obj);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
|
|
|
+ partition, key, jsonStr));
|
|
|
+
|
|
|
+ return dealSendResult(future);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 传入topic名称,消息对象数组,生产者进行发送
|
|
|
+ * @param topicName : topic名称
|
|
|
+ * @param partition : 消息发送分区
|
|
|
+ * @param key : 消息key
|
|
|
+ * @param list : 消息对象数组
|
|
|
+ * @return boolean : 推送是否成功
|
|
|
+ */
|
|
|
+ public Boolean[] sendMessage(String topicName, int partition, String key, List<Object> list) {
|
|
|
+ createTopic(topicName, 5, 1);
|
|
|
+ Boolean[] success = new Boolean[list.size()];
|
|
|
+ for (int i = 0; i < list.size(); i++) {
|
|
|
+ Object obj = list.get(i);
|
|
|
+ String jsonStr = toJSONString(obj);
|
|
|
+ log.info(PUSH_MSG_LOG, jsonStr);
|
|
|
+ //发送消息
|
|
|
+ ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(
|
|
|
+ topicName, partition, key, jsonStr));
|
|
|
+ success[i] = dealSendResult(future);
|
|
|
+ }
|
|
|
+ return success;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理消息推送结果
|
|
|
+ * kafkaTemplate提供了一个回调方法addCallback,
|
|
|
+ * 我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理
|
|
|
+ * @param future :
|
|
|
+ * @return boolean
|
|
|
+ */
|
|
|
+ private boolean dealSendResult(ListenableFuture<SendResult<String, Object>> future) {
|
|
|
+ final boolean[] success={false};
|
|
|
+
|
|
|
+ future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
|
|
|
+ @Override
|
|
|
+ public void onFailure(Throwable throwable) {
|
|
|
+ //发送失败的处理
|
|
|
+ log.info("生产者 发送消息失败 exMessage:{}", throwable.getMessage());
|
|
|
+ success[0] = false;
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public void onSuccess(SendResult<String, Object> result) {
|
|
|
+ //成功的处理
|
|
|
+ log.info("生产者 发送消息成功, topic:{}, partition:{}, offset:{}",
|
|
|
+ result.getRecordMetadata().topic(),
|
|
|
+ result.getRecordMetadata().partition(),
|
|
|
+ result.getRecordMetadata().offset());
|
|
|
+ success[0] = true;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return success[0];
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+}
|