SpringCloud之Spring Cloud Stream:消息驱动
2019-11-23 16:02:05来源:博客园 阅读 ()
Spring Cloud Stream 是一个构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integrationg来连接消息代理中间件(RabbitMQ, Kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
应用程序通过input通道或者output通道来与Spring Cloud Stream中binder(绑定器)交互,通过配置来binding. 而Spring Cloud Stream的binder负责与中间件交互。
开发工具:IntelliJ IDEA 2019.2.3
一、服务器端
1、创建项目
IDEA中创建一个新的SpringBoot项目,名称为“spring-server”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Spring Cloud Discovery -> Eureka Server。
pom.xml完整内容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-server</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-server</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>View Code
2、修改配置application.yml
修改端口号为8761;取消将自己信息注册到Eureka服务器,不从Eureka服务器抓取注册信息。
server: port: 8761 eureka: client: register-with-eureka: false fetch-registry: false
3、修改启动类代码
增加注解@EnableEurekaServer
package com.example.springserver; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @SpringBootApplication @EnableEurekaServer public class SpringServerApplication { public static void main(String[] args) { SpringApplication.run(SpringServerApplication.class, args); } }View Code
二、消息生产者
1、创建项目
IDEA中创建一个新的SpringBoot项目,名称为“spring-producer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit,会自动引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整内容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-producer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>View Code
2、修改配置application.yml
pom.xml使用RabbitMQ,默认情况下,连接本地的5672端口。下面这段rabbitmq也可省略。
server: port: 8081 spring: application: name: spring-producer eureka: instance: hostname: localhost client: serviceUrl: defaultZone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
3、编写发送服务
方法sendOrder使用@Output("myInput")注解表示创建myInput的消息通道。调用该方法后,会向myInput通道投递消息。
如果不使用参数myInput,则使用方法名作为通道名称。
package com.example.springproducer; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.SubscribableChannel; public interface SendService { @Output("myInput") SubscribableChannel sendOrder(); }
4、修改启动类代码
加入注解@EnableBinding以开启Spring容器的绑定功能,以SendService.class为参数,Spring容器启动时,会自动绑定SendService接口中定义的通道。
package com.example.springproducer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableEurekaClient @EnableBinding(SendService.class) public class SpringProducerApplication { public static void main(String[] args) { SpringApplication.run(SpringProducerApplication.class, args); } }
5、添加一个控制器类
调用SendService的发送方法,往服务器发送消息。
package com.example.springproducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired SendService sendService; @RequestMapping(value="/send",method= RequestMethod.GET) public String sendRequest(){ //创建消息 Message msg = MessageBuilder.withPayload("hello world".getBytes()).build(); //发送消息 sendService.sendOrder().send(msg); return "SUCCESS"; } }
三、消息消费者
1、创建项目
IDEA中创建一个新的SpringBoot项目,名称为“spring-consumer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打开pom.xml,添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-consumer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>View Code
2、修改配置application.yml
server: port: 8080 spring: application: name: spring-consumer eureka: instance: hostname: localhost client: serviceUrl: defaultZone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
3、缩写接受消息的通道接口
package com.example.springconsumer; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface ReceiveService { @Input("myInput") SubscribableChannel myInput(); }
4、修改启动类代码
同样绑定消息通道
package com.example.springconsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; @SpringBootApplication @EnableBinding(ReceiveService.class) public class SpringConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringConsumerApplication.class, args); } //订阅myInput通道的消息 @StreamListener("myInput") public void receive(byte[] msg){ System.out.println("接收到的消息:" + new String(msg)); } }
5、测试
(1)检查服务里面的RabbitMQ是否有启动(默认启动);
(2)启动spring-server(8761端口);
(3)启动spring-producer(8081端口);
(4)启动spring-consumer(8080端口);
(5)浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:
接收到的消息:hello world
说明消费者已经可以从消息代理中获取到消息。
四、更换绑定器
上面使用了RabbitMQ作为消息代理,如果使用Kafka,可以更换Maven依赖实现。
在生产者和消费者的pom.xml中,将spring-cloud-starter-stream-rabbit修改为spring-cloud-starter-stream-kafka。
原文链接:https://www.cnblogs.com/gdjlc/p/11920463.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
下一篇:Spring框架学习总结(上)
- Spring系列.ApplicationContext接口 2020-06-11
- springboot2配置JavaMelody与springMVC配置JavaMelody 2020-06-11
- 给你一份超详细 Spring Boot 知识清单 2020-06-11
- SpringBoot 2.3 整合最新版 ShardingJdbc + Druid + MyBatis 2020-06-11
- 掌握SpringBoot-2.3的容器探针:实战篇 2020-06-11
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash