SpringCloudStream实战

  • 时间:2018-08-31 22:21 作者:Java架构技术分享 来源:Java架构技术分享 阅读:1351
  • 扫一扫,手机访问
摘要:Spring Cloud Stream是一个使用于构建消息驱动的微服务应使用程序的框架。Spring Cloud Stream构建于Spring Boot之上,使用于创立独立的生产级Spring应使用程序,并用Spring Integration提供与消息代理商的连接。也就是说,Spring Clo

Spring Cloud Stream是一个使用于构建消息驱动的微服务应使用程序的框架。Spring Cloud Stream构建于Spring Boot之上,使用于创立独立的生产级Spring应使用程序,并用Spring Integration提供与消息代理商的连接。也就是说,Spring Cloud Stream是构建于Spring Boot和Spring Integration之上的框架,帮助创立事件驱动或者消息驱动的微服务。

主要模型如图:

这里我们用Kafka作为消息底层设备,起因见: 为什么我们从RabbitMQ切换到apache kafka?

引入Kafka的Stream启动器:


org.springframework.cloud
spring-cloud-starter-stream-kafka
2.0.1.RELEASE

微服务架构遵循“ 智能端点和哑管 ”准则,端点之间的通信由RabbitMQ或者Apache Kafka等消息传递中间件方驱动,服务通过这些端点或者通道发布领域事件进行通信。

首先我们定义一个接口,定义输入和输出队列管道:

public interface GreetingsStreams {
String INPUT = "greetings-in";
String OUTPUT = "greetings-out";
@Input(INPUT)
SubscribableChannel inboundGreetings();
@Output(OUTPUT)
MessageChannel outboundGreetings();
}

@Input注释使用来表示输入的消息队列,通过该通道接收消息并输入当前应使用;@Output注释表示一个输出通道,通过它发布消息出去。@Input和@Output注解可以采取指定的通道名称(比方这里greetings-in greetings-out")作为参数,假如未提供名称,则用注释的方法名称。

在application.yaml或者property中具体配置该消息通道到Kafka:

spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
greetings-in:
destination: greetings
contentType: application/json
greetings-out:
destination: greetings
contentType: application/json

其中greetings-in和greetings-out配置到Kafka具体的主题topic名称为greetings,序列化类型是json,kafka默认端口在本地9092。

好了,底层基础设备准备完成,现在需要将这个设备安装到我们的应使用中。

@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}

@EnableBinding将应使用配置绑定接口GreetingsStreams中定义的通道INPUT和OUTPUT。

现在我们的应使用和消息基础设备已经绑定了,可用@StreamListener到具体方法以接收具体的流解决事件了。

@Component
@Slf4j
public class GreetingsListener {
@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greetings greetings) {
log.info("Received greetings: {}", greetings);
}
}

StreamListeners 是消息监听者解决方法,接收类型的传入消息Greetings,可以看到框架的核心功能之一:它尝试自动将传入的消息有效负载转换为类型Person。

上面方法是一个没有返回结果的void方法,假如有返回结果,必需用@SendTo注释指定方法返回的数据的输出绑定队列目标output,如以下示例所示;通过

@Component
@Slf4j
public class GreetingsListener {
@StreamListener(GreetingsStreams.INPUT)
@SendTo(GreetingsStreams.OUTPUT)
public String handleGreetings(Greetings greetings) {
log.info("Received greetings: {}", greetings);
return "Received greetings: {}" + greetings;
}
}

Spring cloud stream实现了一个默认的Processor类,相似我们的GreetingsStreams接口,也就是说,可以不使用自己做这个接口

public interface Processor extends Source, Sink {
}
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}

假如用默认的Processor通道名称,注意配置文件里也要配置成相应的通道名。

测试运行

有了接收方,下面我们实现一个发送方,我们通过调使用rest接口发送消息,先看看发送方代码:

@Service
@Slf4j
public class GreetingsService {
private final GreetingsStreams greetingsStreams;
public GreetingsService(GreetingsStreams greetingsStreams) {
this.greetingsStreams = greetingsStreams;
}
public void sendGreeting(final Greetings greetings) {
log.info("Sending greetings {}", greetings);
MessageChannel messageChannel = greetingsStreams.outboundGreetings();
messageChannel.send(MessageBuilder
.withPayload(greetings)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}

我们暴露一个端口来调使用这个发送方:

@RestController
public class GreetingsController {
private final GreetingsService greetingsService;
public GreetingsController(GreetingsService greetingsService) {
this.greetingsService = greetingsService;
}
@GetMapping("/greetings")
@ResponseStatus(HttpStatus.ACCEPTED)
public void greetings(@RequestParam("message") String message) {
Greetings greetings = Greetings.builder()
.message(message)
.timestamp(System.currentTimeMillis())
.build();
greetingsService.sendGreeting(greetings);
}
}

也就是说:这个发送方REST和发送服务 与我们的GreetingsListener是通过消息系统通讯的,不是直接在发送服务里调使用GreetingsListener的方法,这样这两者之间就解耦了。

下面我们使用postman调使用:

http://localhost:8080/greetings?message=hello

控制台结果输出:

c.e.c.GreetingsService : Sending greetings Greetings(timestamp=1535614400754, message=hello)

c.e.c.GreetingsListener : Received greetings: Greetings(timestamp=1535614400754, message=hello)

一个发送和一个接受完成了一个请求调使用,假如GreetingsListener还有返回结果,是放在greetings-out之中的,那么GreetingsListener就变成发送方了,我们也可以参考这套做法再做个监听器。

  • 全部评论(0)
最新发布的资讯信息
【系统环境|】在Qt中如何设置窗体的背景图片(2025-10-29 20:27)
【系统环境|】无声无名 | 杨·罗威斯终身回顾展(2025-10-29 20:26)
【系统环境|】从零开始学Qt(22):QSS详解(3)- 盒子模型(2025-10-29 20:25)
【系统环境|】Quarkus vs Spring Boot 集成 RabbitMQ 谁更香?(2025-10-29 20:24)
【系统环境|】RabbitMQ相关概念及代码示例(2025-10-29 20:24)
【系统环境|】Spring Boot + RabbitMQ:轻松掌握五种基本工作模式(2025-10-29 20:23)
【系统环境|】一篇文章带你彻底玩转-RabbitMQ(2025-10-29 20:22)
【系统环境|】私有云平台搭建——史上最详细(2025-10-29 20:21)
【系统环境|】RabbitMQ最全详解(万字图文总结)(2025-10-29 20:20)
【系统环境|】.Net/C#全网最火RabbitMQ操作【强烈推荐】(2025-10-29 20:20)
手机二维码手机访问领取大礼包
返回顶部