[Spring Cloud]Stream组件介绍-天天报资讯
SCS 在 3.x 做了很大的改动,废除了诸如 @StreamListener、@Input、@Output 等类,保留了 Binder、Binding,并提供了批量消费的支持。 本着学新不学旧的原则,本文将介绍 SCS 3.x 相关内容。 由于关于 spring cloud stream kafka 的文档比较充足,本文就此为例介绍 SCS。
【资料图】
Binder 是提供与外部消息中间件集成的组件,为 Binding 提供了 2 个方法,分别是 bindConsumer 和 bindProducer,它们用于构造生产者和消费者。 Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。
Binder 事务
不要在事务中尝试重试和提交死信。重试时,事务可能已经回归。如果想要提交死信用于善后,那么可以使用 DefaultAfterRollbackProcessor
以在回滚之后提交死信。
Error Channel
binder 会使用 Error Channel 向消费者传递异常,同时可以配置异步生产者发生异常时将异常传递到 Error Channel。
Dead-Letter
默认情况下,某 topic 的死信队列将与原始记录存在于相同分区中。
死信队列中的消息是允许复活的,但是应该避免消息反复消费失败导致多次循环进入死信队列。
应该使用一个专门的处理程序用来对这些死信队列的信息进行善后。
Consumer 消费者
顾名思义,Consumer 定义的是一个消费者,他是一个函数式接口,提供了消费消息的方法。我们可以直接在 Bean 声明中使用 lambda 表达式实现它。
值得注意的是,Consumer 还是一个泛型接口,通过泛型来绑定消息的类型。接收消息的类型我们会用到 KStream 类,他将与发送消息时定义的 KStream 对应,是键值对组成的抽象记录流,但相同 key 的记录不会被覆盖。
@Beanpublic Consumer> consumer() {return input -> input.foreach((key, value) -> {do consume;});}
当我们在应用程序中声明返回 Consumer 的 Bean,那么这个 Bean 就会自动接入消息队列。另外,我们需要用到 spring.cloud.stream.bindings.{beanName}-in-{idx}={topic}
来设置订阅的消息主题。默认情况下,topic 与 beanName 同名。
spring.cloud.stream.bindings.consumer-in-0 = userBuy
当接收到消息时,就会调用 Consumer 定义的 accept 方法进行消息消费。
发送消息 生产者
SCS 并没有对发送消息做出一个具体的封装,而是建议通过各个消息队列支持的 client 或者 template 发送消息。
kafkaTemplate.send(message);
Function 加工厂
但有时候,我们需要对数据进行加工后发送回消息队列中,这个时候就会用到 Function。
它和 Consumer 类似,但是方法多了一个返回值。同样的,这个返回值需要用到 KStream 类,这样就能够支持将处理完的数据返回到消息队列。
@Beanpublic Function, KStream
spring.cloud.stream.bindings.{beanName}-out-{idx}={topic}
来设置出口的消息主题。默认情况下,topic 与 beanName 同名。
Function 相比生产者或消费者,更像是将消息进行加工,这个过程可以对消息进行一系列的处理,包括消息拆分,消息过滤和计算中间结果等。常见的一个用途就是国际化消息和多平台通知。
国际化消息就是对消息进行本地化,Function 就类似一个翻译官的功能,将翻译好的消息转达给消费者。
有时候我们也需要同时对多个平台推送通知,比如邮件、短信等。一般来说,邮件服务器和短信服务器不会写死消息的模板以提高泛用性,这个时候就需要中间人对消息进行加工,嵌入对应平台的模板。
多输出绑定
上面提到了消息拆分,Function 允许多个 topic 的消息发送,返回值上会用到 KStream 数组,然后配置上会用到方才展示的 spring.cloud.stream.bindings.{beanName}-out-{idx}={topic}
,idx 代表的就是返回值 KStream 在数组中的索引。
多输入绑定
多输入绑定在普通应用程序上很少用到,一般用于分布式计算。比如除法计算需要同时拥有除数和被除数。分布式计算也是 SCS 的一大用处之一,知识盲区,在此不多做介绍。
KStream
上面多次提到了 KStream,它实质上是一个顺序且可不断增长的数据集,是数据流的一种。
KTable
KTable 与 KStream 类似,但是与 KStream 不同的是,他不允许 key 的重复。 面对相同 key 的数据,会选择更新而不是插入。 KTable 实质上也是数据流,他的实现类同样继承了 AbstractStream。 可以将他看成某一时刻,KStream 的最新快照。
关键词:
- 不愧电动爹?零下25度暴风雪中的特斯拉Model 3:电池预热45分钟后才充上电
- 12月29日基金净值:前海开源沪港深优势精选混合A最新净值1.636,涨0.62%-全球聚看点
- 杭州钱塘区下沙派出所新地址在哪里?
- 第十四届“全国文化企业30强”发布
- 【环球新视野】2022张家港市人力资源和社会保障局招聘2名人员报名时间+方式
- 河北省保定市和涉县在列 第一批全民运动健身模范市、县命名
- 焦点滚动:建信国证新能源车电池ETF净值下跌2.28% 请保持关注
- 实时:离别句子文案【离别的句子最走心文案】
- 2023济宁元旦滑雪好去处
- 贤丰控股:您提问的同样问题公司已请参阅
-
过去一年,杭州人在哪些消费上踩坑最多?3·15临近,你有什么需要帮忙?_环球视讯
本周三就是3月15日,即3·15消费者权益日。每年一到这个时候,都是消费者学习和了解各种消费陷阱,避免踩坑的好时机。中消协确定2023年消费年
-
怎样知道qq好友隐身了没(怎样知道qq好友隐身) 重点聚焦
1、打开手机QQ客户端,2、指向“打开方式”。3、点击4、如果5、如果。本文到此结束,希望对大家有所帮助。本文由用户上传
-
万和携手航天研究院建立联合研究中心,合力技术研究与创新|当前讯息
3月10日上午,广东万和新电气股份有限公司与中国航天空气动力技术研究院在北京正式宣布,双方将合力建设空气动力联合研究中心,联合开展基础技
-
当前滚动:羊霍的功效(羊霍的功效与作用)
羊霍的功效,羊霍的功效与作用很多人还不知道,现在让我们一起来看看吧!1 调节内分泌,增强免疫力:淫羊藿其实是一种草本植物,或者有些人服
-
麦饭石水杯的作用与功效(盘点麦饭石水杯作用) 天天精选
麦饭石水杯的作用与功效,盘点麦饭石水杯作用很多人还不知道,现在让我们一起来看看吧!解答:1、麦饭石有吸附性:麦饭石多孔,吸附能力强。2
-
太阳系最亮的星星是那一颗恒星_太阳系最亮的星星是那一颗
1、金星金星是太阳系八大行星之一,按离太阳由近及远的次序排列为第二颗。2、在中国古代又称为“长庚”、“启明”或“太白”。
-
新资讯:吉林高速公安白城分局巡逻三大队联合路政综合执法
为进一步规范辖区通行秩序,持续严厉打击超载超限违法行为,吉林高速公安白城分局持续推进落实“一路三方”协作机制,于3月11日联合路政综合执
-
快讯2023-03-12 18:27:35 天天速讯
3月12日电,沙特阿美CEO纳赛尔表示,预计到2023年底,石油需求将增至近1 02亿桶 日。
-
世界快看:微纳米测量技术
1、《微纳米测量技术》是2006年9月1日清华大学出版社出版的图书,作者是王伯雄。2、本书主要讲述了微纳米测量技术的
-
今日普洱茶的泡法标准喝法 环球快讯
1、温壶涤具:先用滚水烫热茶具,主要起温壶温杯的作用,同时可以涤具,随后放入茶叶;润茶:冲入约茶具容量1 4的滚水,
X 关闭
[Spring Cloud]Stream组件介绍-天天报资讯
我脑海中的橡皮擦正片_我脑海中的橡皮擦|全球观察
注意!海汽集团:海南高速拟减持公司股份不超过632万股
天天最新:“一根筋”重建膝关节韧带,协和金银湖骨科团队助陕西小伙重返球场
巾帼柔情铸警魂·铿锵玫瑰别样红丨钟山公安开展“三八”妇女节系列活动|每日讯息
X 关闭
得知西安疫情防控“升级” 男子夜骑共享单车回咸阳淳化
中国医生将任SIU主席背后:从追随者同行者到引领者
海南省通报政法队伍教育整顿成果
云南两地发现核酸阳性人员 西安实行最严格的社会面管控
广东梅州大埔中央红色交通线沿线发现多株百岁古树