Spring Cloud Stream 3.x+kafka 3.8整合

Spring Cloud Stream 3.x+kafka 3.8整合,文末有完整项目链接

  • 前言
  • 一、如何看官方文档(有深入了解需求的人)
  • 二、kafka的安装
    • tar包安装
    • docker安装
  • 三、代码中集成
    • 创建一个测试topic:test
    • producer代码
    • producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)
    • Consumer代码
    • Consumer 配置
    • Consumer 2的代码和配置
  • 四、测试
  • 五、结语

前言

上一篇文章,我们用Spring Cloud Stream整合了RocketMQ:SpringCloud Alibaba五大组件之——RocketMQ,趁着此机会,继续学习了解一下Spring Cloud Stream,本文就以kafka为例。本文项目用到的所有Maven依赖和版本,都是和前面几篇文章一样。

由于整合kafka 不需要用到Cloud Alibaba一系列的技术,所以下载到源码运行不起来的,请删除mysql,nacos,dubbo,redis等一系列相关的依赖和代码。本文写下的时候,kafka最新版本为3.8版本,所以就以3.8版本举例说明。

官方中文文档:https://kafka1x.apachecn.org/documentation.html
官网文档:https://kafka.apache.org/documentation/
中文文档的版本比较老,建议大家对照着英文文档3.8版本的,相互结合起来看。

一、如何看官方文档(有深入了解需求的人)

1.基础操作:建议大家看operation一栏,后面我会简单贴出基本安装使用流程
在这里插入图片描述
2.配置建议看中文版本
在这里插入图片描述

二、kafka的安装

tar包安装

  1. 下载链接:kafka_2.13-3.8.0.tgz

  2. 选择一个合适的位置解压

    tar -zxvf kafka_2.12-3.8.0.tgz
    
  3. 启动自带的zookeeper(后台启动)

    nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
    
  4. 修改kafka server的配置文件,便于外网能够访问
    找到bin\config目录下的server.properties文件
    修改以下两行listeners照着我这样写,advertised.listeners修改为你服务器的ip,端口默认9092

    listeners=PLAINTEXT://0.0.0.0:9092
    advertised.listeners=PLAINTEXT://172.16.72.133:9092
    
  5. 启动kafka server(后台启动)

    nohup bin/kafka-server-start.sh config/server.properties &
    
  6. 稍微扩展一下,集群的搭建,比如我们要扩展为三个集群代理:
    首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):

    cp config/server.properties config/server-1.properties
    cp config/server.properties config/server-2.properties
    

    编辑这些新文件并设置如下属性:

    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dir=/tmp/kafka-logs-1
     
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dir=/tmp/kafka-logs-2
    

    broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的,必须重写端口和日志目录
    然后启动就好了:低一个启动的为leader,如果杀死leader,会重新推荐一个leader出来

    bin/kafka-server-start.sh config/server-1.properties &
    bin/kafka-server-start.sh config/server-2.properties &
    

    但是这样扩展的唯一不好的一点就是,会没有以前的数据,新的topic不影响,具体操作大家可以看文档。

docker安装

  1. 拉取镜像

    docker pull apache/kafka:3.8.0
    
  2. 启动

    docker run -p -d 9092:9092 apache/kafka:3.8.0
    

三、代码中集成

创建一个测试topic:test

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

所有的topic的操作,都可以用kafka-topics.sh来操作,具体的可以看文档。老版本的启动是加–zookeeper的,会报错not found,新版本要用–bootstrap-server。

producer代码

@RestController
@RequestMapping("/mqtest")
public class KafkaTestController {
    private static final Logger logger = LoggerFactory.getLogger(KafkaTestController.class);
    @Autowired
    private StreamBridge streamBridge;

    @RequestMapping("/test1")
    public void testOne() {
        Message<SimpleMsg> msg = new GenericMessage<>(new SimpleMsg("我是 broadcastMessage", new Date().toString()));
        streamBridge.send("broadcastMessage-out-0", msg);
    }
}

自定义消息体SimpleMsg,此类不需要序列化

@AllArgsConstructor
@Data
@NoArgsConstructor
public class SimpleMsg{
    private String msg;
    private String time;
}

producer配置(配置的格式,上篇文章我有详细解释,大家可以回看)

key:serializer 重中之重,发送对象消息的时候,解决转换错误,SpringCloudStream默认的是ByteArraySerializer,但是kafkamore默认的是String

spring:
  cloud:
    stream:
      kafka:
        binder:
          ##kafka的server地址
          brokers: 172.16.72.133:9092
          ##如果topic不存在则创建
          auto-create-topics: true
          auto-add-partitions: true #自动分区
          min-partition-count: 1 #最小分区
          ##这个序列化很关键,如果不加这个配置,则发送对象消息时候,会报转换错误
          configuration:
            key:
              serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      bindings:
        broadcastMessage-out-0:
          destination: test
          content-type: application/json

Consumer代码

 @Bean
 public Consumer<Message<SimpleMsg>> broadcastMessage() {
     return msg -> {
         log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg() + msg.getPayload().getTime());
     };
 }

Consumer 配置

项目中有更详细的配置,这里为了测试用的简化版

spring:
  cloud:
    stream:
      function:
        definition: broadcastMessage
      kafka:
        binder:
          brokers: 172.16.72.133:9092
          auto-create-topics: true
          configuration:
            key:
              serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      bindings:
        broadcastMessage-in-0:
          destination: test
          group: test-topic-account
          content-type: application/json

Consumer 2的代码和配置

项目中还有一个friend模块,当做第二个消费者,代码和配置和Consumer 1完全一样,唯一不同的就是可以设置group不同,这里就不贴代码了。

四、测试

生产者发送两个消息
在这里插入图片描述
两个消费者实例,分组一样,则轮询消费,分组不同,则单独消费
account模块消费者:
在这里插入图片描述
friend模块消费者:
在这里插入图片描述

五、结语

到这篇文章,这一个系列基本就算结束了,后面可能会补充一下内容,或者去写点其他的东西。或者说,去研究下springboot的集成而不用Spring Cloud Stream,后面再说吧。

本文完整项目代码GitHub地址,请切换到kafka分支
https:https://github.com/wangqing-github/DubboAndNacos.git
ssh:git@github.com:wangqing-github/DubboAndNacos.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/890316.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习】逻辑回归|分类问题评估|混淆矩阵|ROC曲线|AUC指标 介绍及案例代码实现

文章目录 逻辑回归逻辑回归简介逻辑回归的数学基础逻辑回归原理概念损失函数 逻辑回归API函数和案例案例癌症分类预测 分类问题评估混淆矩阵分类评估方法 - 精确率 召回率 F1ROC曲线 AUC指标案例AUC 计算的API分类评估报告api 电信客户流失预测案例 逻辑回归 逻辑回归简介 ​…

matlab不小心删除怎么撤回

预设项——>删除文件——>移动至临时文件夹 tem临时文件夹下

SwiftUI 6.0(iOS 18)新增的网格渐变色 MeshGradient 解惑

概述 在 SwiftUI 中&#xff0c;我们可以借助渐变色&#xff08;Gradient&#xff09;来实现更加灵动多彩的着色效果。从 SwiftUI 6.0 开始&#xff0c;苹果增加了全新的网格渐变色让我们对其有了更自由的定制度。 因为 gif 格式图片自身的显示能力有限&#xff0c;所以上面的…

【自动驾驶汽车通讯协议】GMSL通信技术以及加串器(Serializer)解串器(Deserializer)介绍

文章目录 0. 前言1. GMSL技术概述2. 为什么需要SerDes&#xff1f;3. GMSL技术特点4.自动驾驶汽车中的应用5. 结论 0. 前言 按照国际惯例&#xff0c;首先声明&#xff1a;本文只是我自己学习的理解&#xff0c;虽然参考了他人的宝贵见解及成果&#xff0c;但是内容可能存在不准…

六西格玛黑带项目:TBX-02无人机飞行稳定性提升——张驰咨询

一、项目背景与问题定义 TBX-02是该公司最新发布的消费级无人机&#xff0c;面向摄影爱好者和户外探险者。产品上市后&#xff0c;通过客户反馈和实际测试数据发现&#xff0c;该无人机在复杂飞行环境中&#xff0c;如强风或快速移动时&#xff0c;存在明显的飞行抖动和稳定性…

RabbitMQ初识

目录 Kafka RocketMQ RabbitMQ MQ界面(它使用的端口号5672&#xff0c;界面是15672&#xff09; 如何添加一个虚拟机&#xff0c;点击右侧 Topics&#xff08;通配符模式&#xff09; 发布确认机制 持久性(可靠性保证的机制之一) JDK17,Linux服务器Ubuntu 什么是MQ 实…

前端开发笔记--html 黑马程序员2

文章目录 前端常用标签一、标题标签二、段落标签和换行标签和水平线标签三、文本格式化标签![请添加图片描述](https://i-blog.csdnimg.cn/direct/87583fa23fe04229b016912051f3fc45.png)四、盒子标签五、图像标签六、连接标签七、注释和特殊字符 八、表格标签的基本使用九、列…

48 Redis

48 Redis 前言 Redis&#xff08;Remote Dictionary Server )&#xff0c;即远程字典服务。是一个开源的使用ANSI C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库&#xff0c;并提供多种语言的API。 redis会周期性的把更新的数据写入磁盘或者把修改操…

数据结构-5.6.二叉树的先,中,后序遍历

一.遍历&#xff1a; 二.二叉树的遍历&#xff1a;利用了递归操作 1.简介&#xff1a; 二叉树的先序遍历&#xff0c;中序遍历&#xff0c;后序遍历都是以根结点遍历顺序为准的&#xff0c;如先序遍历就先遍历根结点 2.实例&#xff1a; 例一&#xff1a; 例二&#xff1a; …

HarmonyOS NEXT应用开发实战(二、封装比UniApp和小程序更简单好用的网络库)

网络访问接口&#xff0c;使用频次最高。之前习惯了uniapp下的网络接口风格&#xff0c;使用起来贼简单方便。转战到鸿蒙上后&#xff0c;原始网络接口写着真累啊&#xff01;目标让鸿蒙上网络接口使用&#xff0c;简单程度比肩uniapp&#xff0c;比Axios更轻量级。源码量也不多…

JUC并发编程进阶1:线程基础知识复习

1 从start一个线程说起 在 Java 中&#xff0c;Thread 类是用于创建和管理线程的核心类。通过调用 Thread 类的 start() 方法&#xff0c;可以启动一个新的线程&#xff0c;并执行线程的 run() 方法。下面我们来详细分析一下 start() 方法的实现。 1.1 代码示例 首先&#x…

前端开发笔记--html 黑马程序员1

文章目录 前端开发工具--VsCode前端开发基础语法VsCode优秀插件Chinese --中文插件Auto Rename Tag --自动重命名插件open in browserOpen in Default BrowserOpen in Other Browser Live Server -- 实时预览 前端开发工具–VsCode 轻量级与快速启动 快速加载&#xff1a;VSCo…

大数据毕业设计选题推荐-音乐数据分析系统-音乐推荐系统-Python数据可视化-Hive-Hadoop-Spark

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

ansible自动化运维,一些基础命令、更方便掌握ansible。

1.先准备三台机子&#xff0c;一台ansible服务端、和两台客户端&#xff0c;配置客户端主机名、cinder和compute。 192.168.10.202ansible客户端192.168.10.56cinder客户端192.168.10.55compute客户端 2.下载ansible&#xff08;客户端&#xff09;,准备repo文件。 #编写文件…

“网络安全等级保护测评入门:基础概念与重要性“

网络安全等级保护测评&#xff08;简称“等保测评”&#xff09;是依据国家网络安全等级保护制度&#xff0c;对信息系统安全等级进行评估和评定的过程。它是提高信息系统安全性、保障信息安全的重要手段。以下是关于等保测评的基础概念与重要性的详细解读&#xff1a; 一、等…

在docker的容器内如何查看Ubuntu系统版本

文章目录 写在前面一、问题描述二、解决方法参考链接 写在前面 自己的测试环境&#xff1a; docker 一、问题描述 由于 lsb_release -a 只能查看自己电脑&#xff08;宿主机&#xff09;的系统版本&#xff0c;如果在docker的容器内又应该如何查看Ubuntu系统版本呢&#xff…

IDEA运行Java程序时出错。提示:命令行过长。通过 JAR 清单或通过类路径文件缩短命令行,然后重新运行。

文章目录 一、遇到问题二、分析问题三、解决办法 一、遇到问题 运行 OpenCVUtils.test 时出错。命令行过长。 通过 JAR 清单或通过类路径文件缩短命令行&#xff0c;然后重新运行。二、分析问题 IDEA提示很明显了。 三、解决办法 运行——>编辑配置 运行/调试配置——&g…

欧科云链研究院深掘链上数据:洞察未来Web3的隐秘价值

目前链上数据正处于迈向下一个爆发的重要时刻。 随着Web3行业发展&#xff0c;公链数量呈现爆发式的增长&#xff0c;链上积聚的财富效应&#xff0c;特别是由行业热点话题引领的链上交互行为爆发式增长带来了巨量的链上数据&#xff0c;这些数据构筑了一个行为透明但与物理世…

模型 知识诅咒

系列文章 分享 模型&#xff0c;了解更多&#x1f449; 模型_思维模型目录。知者难悟无知惑。 1 知识诅咒案例 1.1 会议室的误解 李经理是一家科技公司的产品经理&#xff0c;他负责领导一个新产品的开发项目。项目团队由不同背景和经验的成员组成&#xff0c;包括新入职的员…

kibana 删除es指定数据,不是删除索引

1 查询条件查询出满足条件的数据 GET /order_header_idx_202410/_search {"from":0,"size":10,"query":{"bool":{"filter":[{"term":{"oh_tenantId":{"value":"0211000001",&…