[关闭]
@levinzhang 2022-08-01T22:09:37.000000Z 字数 14010 阅读 261

使用Apache Kafka实现Quarkus的反应式消息

by
* Alex Soto

摘要:

现在,数据处理和消费的方式与以往的实践是不一样的。在过去,数据会存储到一个数据库中,并且会进行批处理以获取分析结果。Apache Kafka是一个分布式的事件存储和流处理平台,能够实时地存储、消费和处理数据流。在本文中,我们将会学习如何使用Apache Kafka和Quarkus生产和消费数据。


核心要点

什么是Apache Kafka以及为何要使用它?

现在,数据处理和消费的方式与以往的实践是不一样的。在过去,数据会存储到一个数据库中,并且会进行批处理以获取分析结果。

尽管这种方式依然占有一席之地,但是更现代化的平台能够让我们在数据进入系统的时候对其进行实时处理。

Apache Kafka(简称为Kafka)是一个分布式的事件存储和流处理平台,能够存储、消费和处理数据流。

要理解Kafka是如何运行的,我们需要掌握五个基本概念:

Apache Kafka的一个重要特点是它在创建时充分考虑到了可扩展性和容错性,使其非常适合高性能的应用。我们认为,Kafka可以取代一些传统的消息系统,比如Java Message Service(JMS)和Advanced Message Queuing Protocol(AMQP)。

Apache Kafka能够与当前使用的大多数语言进行集成,但是在本文中,我们将会讨论它与Java的集成,具体来讲是与Quarkus Java栈的集成。

Quarkus是什么?

Quarkus是一个全栈、Kubernetes原生的Java框架,适用于Java虚拟机(JVM)和原生编译环境,专门为容器中运行的Java进行了优化,使其成为Serverless、云和Kubernetes环境下的高效平台。

Quarkus没有重复发明轮子,而是使用了由标准/规范支撑的知名企业级框架,并且使它们能够借助GraalVM编译为二进制文件。

如何在Quarkus中集成Kafka?

Quarkus使用SmallRye Reactive Messaging项目实现与Apache Kafka的交互。

Quarkus入门

要开始使用Quarkus,最快捷的方式就是通过其初始化页面添加所需的依赖。每个服务可能会有不同的依赖,我们可以在Java 11或Java 17之间选择。要实现Quarkus与Kafka的集成,我们需要添加SmallRye Reactive Messaging - Kafka Connector扩展。

要开发的应用

假设我们是一家影视流媒体公司,其中有个用例就是保存电影。这的确可以通过传统数据库来实现,但是考虑到要实现良好的用户体验需要实时互动,因此我们决定将它们存储在Kafka中。

所以,我们会有两个服务,其中一个服务会在用户停止播放电影的时候生成一个事件,另外一个服务则会消费这些事件,并以服务器事件的方式对其进行展示和流式处理。

下图展示了应用的架构:

接下来,我们使用Quarkus实现这些服务并阐述一些内部的细节。

电影播放的生产者(Movie Plays Producer)

每当用户停止播放电影的时候,该服务会向Kafka PlaytimeMovies主题发送一个事件。该事件包含了电影的ID以及观看的总时间。为了便于显示,我们将会使用一个定时器自动触发模拟用户观看电影的逻辑。

当该服务启动的时候,它将会生成一些电影到Kafka Movies主题中。

创建项目

导航至Quarkus的初始页面并选择smallrye-reactive-messaging-kafka以便于集成Kafka。然后,选择Jackson扩展,用于实现事件在JSON和Java的对象-字节数组之间进行编排/解排。同时,取消选中生成Started Code的选项。

如下面的截图所示:

你也可以跳过这个手动的步骤并导航至Kafka Quarkus Generator链接,在这里,所有的内容都已经选择好了。然后,点击Generate your application按钮,以下载应用骨架的压缩文件。

解压文件,并在你最喜欢的IDE中打开项目。

开发

我们创建两个POJO,其中一个代表Movie,另外一个代表PlayedMovie

  1. public class Movie {
  2. public int id;
  3. public String name;
  4. public String director;
  5. public String genre;
  6. public Movie(int id, String name, String director, String genre) {
  7. this.id = id;
  8. this.name = name;
  9. this.director = director;
  10. this.genre = genre;
  11. }
  12. }

Movie包含了电影的idnamedirectorgenre

  1. public class PlayedMovie {
  2. public int id;
  3. public long duration;
  4. public MoviePlayed(int id, long duration) {
  5. this.id = id;
  6. this.duration = duration;
  7. }
  8. }

PlayedMovie包含了idduration字段,分别代表了已播放电影的标识符以及用户观看的时长。

我们还需要一个新的类MovieKafkaGenerator,它负责将电影存储到Kafka主题中并模拟播放电影。

想要向主题上发布事件,我们首先需要两个类,分别是@Outgoing注解和Record类,@Outgoing用来以通道(channel)的形式指定要将事件发送至何处,在这里我们将其配置为指向Movies主题,Record代表了事件的包装器,以键/值的方式进行声明。

现在,我们创建MovieKafkaGenerator类,它会生成电影到Kafka Movies主题中。

  1. package org.acme.movieplays;
  2. import java.time.Duration;
  3. import java.util.List;
  4. import java.util.Random;
  5. import javax.enterprise.context.ApplicationScoped;
  6. import org.eclipse.microprofile.reactive.messaging.Outgoing;
  7. import io.smallrye.mutiny.Multi;
  8. import io.smallrye.reactive.messaging.kafka.Record;
  9. @ApplicationScoped
  10. public class MovieKafkaGenerator {
  11. private List<Movie> movies = List.of(
  12. new Movie(1, "The Hobbit", "Peter Jackson", "Fantasy"),
  13. new Movie(2, "Star Trek: First Contact", "Jonathan Frakes", "Space"),
  14. new Movie(3, "Encanto", "Jared Bush", "Animation"),
  15. new Movie(4, "Cruella", "Craig Gillespie", "Crime Comedy"),
  16. new Movie(5, "Sing 2", "Garth Jennings", "Jukebox Musical Comedy")
  17. );
  18. // 填充电影到Kafka主题中
  19. @Outgoing("movies")
  20. public Multi<Record<Integer, Movie>> movies() {
  21. return Multi.createFrom().items(movies.stream()
  22. .map(m -> Record.of(m.id, m))
  23. );
  24. }
  25. }

在这个类中,有几件重要的事情需要注意:

最后一步就是配置连接至Kafka实例的Quarkus参数。Quarkus应用可以通过src/main/resources/目录下的application.properties文件进行配置。

通过如下的通用属性,我们可以很容易地配置通道和主题之间的关系:

  1. mp.messaging.outgoing.<channel_name>.topic=<topic>

在我们的样例中,定义如下所示:

  1. mp.messaging.outgoing.movies.topic=movies

你可能想知道Kafka broker的位置是在哪里配置的呢?对于本地开发来说,我们不需要进行配置,因为Quarkus为Kafka提供了Dev Services特性。Dev Services会提供所需的外部依赖的实例(如数据库实例、Kafka broker、Keycloak服务等),这些实例会在容器运行时中提供,比如Podman或者其他兼容OCI的工具。从开发者的角度来说,如果我们包括了某个扩展但是并没有对其进行配置的话,Quarkus会自动启动服务,并配置应用来使用它。

基于此,我们在开发过程中,并不需要其他的配置参数。Quarkus就会帮助我们实现这一点。

重要提示:要在运行样例,我们需要在本地机器上有一个正在运行的Docker主机。如果没有本地没有Docker的话,那就需要有一个部署好的Kafka broker,我们将会在后文介绍如何在Quarkus中配置这个“远程”实例。

Docker主机启动并运行起来之后,在终端中以Quarkus dev模式下启动应用:

  1. ./mvnw compile quarkus:dev

终端输出如下所示:

  1. [INFO] Scanning for projects...
  2. [INFO]
  3. [INFO] -------------------&lt; org.acme:movie-plays-producer &gt;--------------------
  4. [INFO] Building movie-plays-producer 1.0.0-SNAPSHOT
  5. [INFO] --------------------------------[ jar ]---------------------------------
  6. ….
  7. 2022-03-21 11:37:24,474 INFO [io.qua.sma.dep.processor] (build-8) Configuring the channel 'movies' to be managed by the connector 'smallrye-kafka'
  8. 2022-03-21 11:37:24,483 INFO [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-30) Generating Jackson serializer for type org.acme.movieplays.Movie
  9. --
  10. --
  11. Checking Docker Environment 2022-03-21 11:37:25,018 INFO [org.tes.uti.ImageNameSubstitut
  12. --
  13. 2022-03-21 11:37:28,500 INFO [io.qua.kaf.cli.dep.DevServicesKafkaProcessor] (build-22) Dev Services for Kafka started. Other Quarkus applications in dev mode will find the broker automatically. For Quarkus applications in production mode, you can connect to this by starting your application with -Dkafka.bootstrap.servers=OUTSIDE://localhost:32769
  14. 2022-03-21 11:37:29,581 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 6.666s.
  15. 2022-03-21 11:37:29,582 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
  16. 2022-03-21 11:37:29,582 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

应用首先会被编译,然后自动配置Jackson序列化器(还记得我们在开始的时候就添加了该扩展),从而将Movie对象序列化为字节数组以存储在Kafka主题中。随后,Kafka broker会自动在localhost:32769启动,应用会自动配置连接到此处。最后,应用启动并运行,所有的电影会被插入到Kafka Movies主题中。

我们可以使用kcat工具来探查主题的内容。在终端窗口运行如下的命令,记得替换成你的Kafka broker地址:

  1. kcat -b localhost:32769 -t movies -C -K:
  2. :{"id":1,"name":"The Hobbit","director":"Peter Jackson","genre":"Fantasy"}
  3. :{"id":2,"name":"Star Trek: First Contact","director":"Jonathan Frakes","genre":"Space"}
  4. :{"id":3,"name":"Encanto","director":"Jared Bush","genre":"Animation"}
  5. :{"id":4,"name":"Cruella","director":"Craig Gillespie","genre":"Crime Comedy"}
  6. :{"id":5,"name":"Sing 2","director":"Garth Jennings","genre":"Jukebox Musical Comedy"}
  7. % Reached end of topic movies [0] at offset 5

停掉应用,我们接下来添加生成电影播放的内容。

打开MovieKafkaGenerator类并添加如下代码:

  1. private Random random = new Random();
  2. @Inject
  3. Logger logger;
  4. @Outgoing("play-time-movies")
  5. public Multi<Record<String, PlayedMovie>> generate() {
  6. return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
  7. .onOverflow().drop()
  8. .map(tick -> {
  9. Movie movie = movies.get(random.nextInt(movies.size()));
  10. int time = random.nextInt(300);
  11. logger.info("movie {0} played for {1} minutes", movie.name, time);
  12. // Region作为key
  13. return Record.of("eu", new PlayedMovie(movie.id, time));
  14. });
  15. }

在这个新方法中,有几件重要的事情需要注意:

最后,打开application.properties文件并配置新的通道:

  1. mp.messaging.outgoing.play-time-movies.topic=playtimemovies

再次启动应用,现在应用每秒钟都会生成一个新的事件。

  1. ./mvnw compile quarkus:dev
  1. 2022-03-21 12:36:01,297 INFO [io.sma.rea.mes.kafka] (Quarkus Main Thread) SRMSG18258: Kafka producer kafka-producer-play-time-movies, connected to Kafka brokers 'OUTSIDE://localhost:32771', is configured to write records to 'playtimemovies'
  2. 2022-03-21 12:36:01,835 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 148 minutes
  3. 2022-03-21 12:36:02,336 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 288 minutes
  4. 2022-03-21 12:36:02,836 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 176 minutes

每当有新事件发布时,控制台都会展示日志。现在,我们使用kcat来探查主题的内容。

  1. kcat -b localhost:32773 -t playtimemovies -C -K:
  2. eu:{"id":4,"duration":88}
  3. eu:{"id":3,"duration":291}
  4. eu:{"id":1,"duration":228}
  5. eu:{"id":2,"duration":165}
  6. eu:{"id":1,"duration":170}
  7. eu:{"id":4,"duration":75}

电影播放的消费者(Movie Plays Consumer)

该服务负责消费来自Kafka主题的事件。被消费的事件是以HTTP服务器端事件的形式流向调用者的。事件是playedmovie数据,其中包含了播放的电影ID以及已观看的总时长。

创建项目

导航至Quarkus的初始页面,选择resteasy-reactive-jackson来实现JAX-RS反应式端点,借助Jackson实现对Java对象和JSON之间的编排/解排,并选择smallrye-reactive-messaging-kafka扩展实现与Kafka的集成。同时,取消选中Started Code生成选项。

同样,你可以跳过这个手动的步骤并导航至Kafka Quarkus Generator链接,在这里,所有的内容都已经选择好了。然后,点击Generate your application按钮,以下载应用骨架的压缩文件。

解压文件,并在你最喜欢的IDE中打开项目。

开发

该服务会处理PlayedMovie事件,所以我们为该元素创建一个简单的POJO:

  1. public class PlayedMovie {
  2. public int id;
  3. public long duration;
  4. public MoviePlayed(int id, long duration) {
  5. this.id = id;
  6. this.duration = duration;
  7. }
  8. }

然后,创建名为PlayedMovieResource的新类,并创建一个JAX-RS反应式端点,以便于流入来自Kafka主题的事件。

  1. import javax.ws.rs.GET;
  2. import javax.ws.rs.Path;
  3. import javax.ws.rs.Produces;
  4. import javax.ws.rs.core.MediaType;
  5. import org.eclipse.microprofile.reactive.messaging.Channel;
  6. import io.smallrye.mutiny.Multi;
  7. @Path("/movies")
  8. public class PlayedMovieResource {
  9. @Channel("played-movies")
  10. Multi<MoviePlayed> playedMovies;
  11. @GET
  12. @Produces(MediaType.SERVER_SENT_EVENTS)
  13. public Multi<PlayedMovie> stream() {
  14. return playedMovives;
  15. }
  16. }

这是一个很小的类,但是做了很多的事情:

最后,在application.properties文件中对通道进行配置,在这里我们配置了通道(包括主题和偏移策略)并且将监听端口修改为9090,这样的话就不会与监听8080端口的生产者服务冲突。

  1. mp.messaging.incoming.movies-played.topic=playtimemovies
  2. mp.messaging.incoming.movies-played.auto.offset.reset=earliest
  3. %dev.quarkus.http.port=9090

在终端窗口中启动并运行movie-player-producer服务后,我们启动movie-player-consumer。在一个新的终端窗口中,以dev模式运行服务。

  1. ./mvnw compile quarkus:dev`
  1. [INFO] Scanning for projects...
  2. [INFO]
  3. [INFO] -------------------< org.acme:movie-plays-consumer >--------------------
  4. [INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
  5. [INFO] --------------------------------[ jar ]---------------------------------
  6. ….
  7. 2022-03-21 17:59:08,079 INFO [io.qua.sma.dep.processor] (build-13) Configuring the channel 'movies-played' to be managed by the connector 'smallrye-kafka'
  8. 2022-03-21 17:59:08,092 INFO [io.qua.sma.rea.kaf.dep.SmallRyeReactiveMessagingKafkaProcessor] (build-33) Generating Jackson deserializer for type org.acme.movieplays.MoviePlayed
  9. ….
  10. 2022-03-21 17:59:10,617 INFO [io.sma.rea.mes.kafka] (smallrye-kafka-consumer-thread-0) SRMSG18257: Kafka consumer kafka-consumer-movies, connected to Kafka brokers 'localhost:32771, belongs to the 'movie-plays-consumer'
  11. ….
  12. 2022-03-21 17:59:10,693 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-consumer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.114s. Listening on: http://localhost:9090
  13. 2022-03-21 17:59:10,693 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
  14. 2022-03-21 17:59:10,694 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, resteasy-reactive, resteasy-reactive-jackson, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]

应用首先会被编译,然后自动配置Jackson序列化器(还记得我们在开始的时候就添加了该扩展),从而能够将存储在Kafka主题中的字节数组对象反序列化为Java对象。运行中的应用会探测到已经启动的Kafka集群并自动进行连接。最后,应用会在9090端口启动。

在新的窗口中,运行如下的命令,获取流式数据:

  1. curl -N localhost:9090/movies
  1. data:{"id":4,"duration":213}
  2. data:{"id":4,"duration":3}
  3. data:{"id":3,"duration":96}
  4. data:{"id":5,"duration":200}
  5. data:{"id":2,"duration":234}
  6. data:{"id":1,"duration":36}
  7. data:{"id":1,"duration":162}
  8. data:{"id":3,"duration":88}

我们可以观察到来自Kafka主题的数据是如何自动进行流式处理并以HTTP请求的形式发出的。

上述的样例达成了两个目的,首先以Multi的形式注入一个通道,以便于接收事件,然后将这些事件发送至带有@Incoming注解的方法中。

停止消费者服务并添加如下的代码片段至PlayedMovieResource类中,以消费来自Kafka Movie主题的事件:

  1. import org.eclipse.microprofile.reactive.messaging.Incoming;
  2. import org.jboss.logging.Logger;
  3. @Inject
  4. Logger logger;
  5. @Incoming("movies")
  6. public void newMovie(Movie movie) {
  7. logger.infov("New movie: {0}", movie);
  8. }

在本例中,每当有新的电影发布至movies通道(Movies主题)时,newMovie()方法就会被调用。方法的参数就是Kafka主题的负载。

application.properties文件中配置通道,使其指向Movies主题。

  1. mp.messaging.incoming.movies.topic=movies
  2. mp.messaging.incoming.movies.auto.offset.reset=earliest

现在,再次启动movie-plays-consumer服务,我们会发现有些日志行打印出了电影的列表:

  1. ./mvnw compile quarkus:dev
  1. [INFO] Scanning for projects...
  2. [INFO]
  3. [INFO] -------------------< org.acme:movie-plays-consumer >---------------------
  4. [INFO] Building movie-plays-consumer 1.0.0-SNAPSHOT
  5. [INFO] --------------------------------[ jar ]---------------------------------
  6. 2022-03-21 17:59:12,146 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-13) SRMSG18256: Initialize record store for topic-partition 'movies-0' at position -1.
  7. 2022-03-21 17:59:12,160 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
  8. 2022-03-21 17:59:12,164 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
  9. 2022-03-21 17:59:12,165 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
  10. 2022-03-21 17:59:12,166 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
  11. 2022-03-21 17:59:12,167 INFO [org.acm.mov.MoviePlayedResource] (pool-1-thread-1) New movie: Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]

外部的Kafka Broker

你也可以使用自己的Kafka broker,只需要在application.properties文件中配置kafka.bootstrap.servers属性即可。

  1. kafka-bootstrap.servers=kafka:9092

结论

到目前为止,连接Quarkus应用到Apache Kafka并通过主题生产和消费消息/事件是非常容易的。消费Kafka消息很简单,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(比如过滤或操作事件)的话,又该怎么办呢?如果我们需要对事件做一些关联,又该怎么办呢(比如,playedmovie事件中包含电影的id,但是我们该如何与Movie主题联合起来获取电影的名字呢)?

当然,我们可以编写专门的代码来操作所有发送的数据。不过,Kafka Streams项目能够帮助我们在事件生成的时候来消费事件流,从而进行任意的转换、流连接等操作,并且有选择性地将新的数据写回到主题中。

Kafka Streams是一个很大的话题,它在解决实时处理问题上的各种功能令人印象深刻。我们将会用一篇专门的文章介绍Kafka Streams和Quarkus,敬请关注。

关于作者

**Alex Soto **

Alex Soto是红帽公司的开发者体验总监。他对Java领域、软件自动化充满热情,他相信开源软件模式。Soto是Manning的《Testing Java Microservices》和O’Reilly的《Quarkus Cookbook》两本书的共同作者,他还是多个开源项目的贡献者。自 2017 年以来,他一直是Java Champion,是国际演讲者和Salle URL 大学的教师。你可以在 Twitter 上关注他(Alex Soto ⚛️),随时了解 Kubernetes 和 Java 领域的动态。

查看英文原文:Getting Started to Quarkus Reactive Messaging with Apache Kafka

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注