[关闭]
@levinzhang 2022-08-20T01:36:09.000000Z 字数 14965 阅读 260

Kafka Streams与Quarkus:实时处理事件

by

摘要:

消费Kafka消息是很容易的,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(过滤、连接或操作事件)的话,仅仅使用Kafka的消费API可能就不是最好的方式了,因为这将导致代码变得非常复杂。在实时处理Kafka事件方面,Kafka Streams和Quarkus是绝配。


核心要点

在本系列的第一部分中,我们学习了Apache KafkaQuarkus的集成,并开发了一个简单的应用,从两个Kafka主题生产和消费事件。

在那个样例中,我们模拟了一个影视流公司,在一个Kafka主题中存储电影信息,在另外一个主题中存储了用户停止观看电影时所发生的每个事件,并捕获了电影已播放的时间。

下图展示了该应用的架构:

我们可以看到,消费消息很简单,只要有消息生成,我们就可以得到它们,但是除此之外,我们也做不了其他的事情了。如果我们需要实时处理数据(比如过滤或操作事件)或者我们需要在事件之间做一些关联,单纯使用Kafka的消费API可能就不是最佳的方式了,因为这会导致代码非常复杂。

Kafka Streams

Kafka Streams项目能够帮助我们在事件产生时实时消费它们,应用各种转换,执行流连接等,并且可以选择性地将新的数据表述写回主题中。

对于有状态和无状态的流应用来说,Kafka Streams都是理想方案,它能够实现基于时间的操作(例如,围绕给定的时间段对事件进行分组),并且考虑到了Kafka生态系统中普遍存在的可扩展性、可靠性和可维护性。

Kafka Stream由三个元素组成,即输入(源处理器)、输出(sink处理器)和处理器(流处理器)。

源处理器(Source processor):源处理器代表一个Kafka主题。源处理器会发送事件到一个或多个流处理器中。

流处理器(Stream processor):流处理器会将转换/逻辑应用于输入流中,比如连接、分组、计数、映射等。流处理器可以连接至另一个流处理器和/或sink处理器。

Sink处理器(Sink processor):Sink处理器代表了输出的数据,它会连接至一个Kafka主题。

拓扑结构(topology)是由源、处理器和sink组成的无循环图,事件会传入到一个Kafka Streams中,该实例将开始拓扑结构的执行。

Kafka Streams和Quarkus

Quarkus使用Quarkus KStreams扩展实现与Kafka Streams集成。

Quarkus起步

使用Quarkus最快捷的方式是通过初始化页面添加所需的依赖。每个服务可能需要不同的依赖,你可以选择Java 11或Java 17。为了实现Quarkus与Kafka Streams的集成,我们至少需要添加Kafka Streams扩展。

要开发的应用

正如在本文开始时所提到的,在本系列的第一部分中,我们开发了一个影视流公司,它有两个Kafka主题,其中一个用来存储电影的列表,另外一个主题会在用户停止播放电影时存储用户所在的区域(事件的键),并且会以电影id和播放时间作为事件的值。

所有的这些逻辑都是在名为Movie Plays Producer的生成者服务中创建的,该服务是使用Quarkus开发的。

除此之外,我还使用Quakus开发了一个Movie Plays Consumer服务,它会消费这两个主题的事件并且会在控制台上展示它们(并实现了HTTP服务器端事件)。

但是,这里没有对数据进行任何处理,它只是按照原样进行了接收。如果我们想要在movies和playtimemovies主题之间进行一下连接,在获取电影播放时长的时候得到电影的详细信息而不是id的话,那又该怎么办呢?

如果仅仅使用Kafka消息来实现这样的逻辑会变成一项很复杂的任务,因为我们需要在一个Map中存储Movie信息,并且在每个playedmovie事件发生时,进行匹配处理。

Movie Plays KStream

与其为每个用例手工编写代码,不如看一下如何使用Kafka Streams,以及它是如何与Quarkus集成来解决这个问题的。

创建项目

导航至Quarkus的初始化页面,并选择Apache Kafka Streams扩展来实现与Kafka Streams的集成。然后,选择RestEasy和RestEasy Jackson扩展实现事件从Java对象和JSON之间的编排/解排。同时,取消选中Started Code生成选项。

请参照下面的截图:

你也可以跳过这个手动的步骤并导航至Kafka Stream Quarkus Generator链接,在这里,所有的依赖都已经选择好了。然后,点击Generate your application按钮,以下载应用骨架的压缩文件。

解压文件,并在你最喜欢的IDE中打开项目。

开发

当开发Kafka Stream应用时,我们需要做的第一件事就是创建Topology实例,并定义源、处理器和sink。

在Quarkus中,我们只需要创建一个CDI类,这个类需要包含一个返回Topology实例的方法。

创建名为TopologyProducer的类,它将会实现从这两个主题消费事件并连接它们的逻辑。最后,生成的结果将会发送至一个sink处理器,该处理器以控制台输出的形式展示结果。

还有一个元素我们没有提到,在这些场景中它非常有用,那就是Kafka Tables。

一个主题可以包含具有相同键的多个事件。例如,我们可以使用某个键插入一个电影,然后我们可以使用相同的键创建一个新的事件来对电影进行更新:

但是,如果我们想要让movies主题与playtimemovies主题进行连接的话,那我们该选择使用哪个值为1的事件呢?第一个还是第二个?在这个具体的情况中,应该选择最新的那一个,因为它包含了电影的最新版本。为了获取每个事件的最新版本,Kafka Streams有一个的概念(KTable/GlobalKTable)。

Kafka Streams会浏览指定的主题,获取每个事件的最新版本,并将其放到一个表实例中。

KafkaStream扩展并不会像Kafka Messaging集成那样自动注册SerDes,所以我们需要在拓扑中手动注册它们。

  1. package org.acme;
  2. import javax.enterprise.context.ApplicationScoped;
  3. import javax.enterprise.inject.Produces;
  4. import org.apache.kafka.common.serialization.Serdes;
  5. import org.apache.kafka.streams.KeyValue;
  6. import org.apache.kafka.streams.StreamsBuilder;
  7. import org.apache.kafka.streams.Topology;
  8. import org.apache.kafka.streams.kstream.Consumed;
  9. import org.apache.kafka.streams.kstream.GlobalKTable;
  10. import org.apache.kafka.streams.kstream.KStream;
  11. import org.apache.kafka.streams.kstream.Printed;
  12. import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
  13. @ApplicationScoped
  14. public class TopologyProducer {
  15. private static final String MOVIES_TOPIC = "movies";
  16. private static final String PLAY_MOVIES_TOPIC = "playtimemovies";
  17. @Produces
  18. public Topology getTopCharts() {
  19. final StreamsBuilder builder = new StreamsBuilder();
  20. // 用于Movie和PlayedMovie的SerDes
  21. final ObjectMapperSerde<Movie> movieSerder = new ObjectMapperSerde<>(Movie.class);
  22. final ObjectMapperSerde<MoviePlayed> moviePlayedSerder = new ObjectMapperSerde<>(MoviePlayed.class);
  23. // 为Movies主题创建一个Global Kafka Table
  24. final GlobalKTable<Integer, Movie> moviesTable = builder.globalTable(
  25. MOVIES_TOPIC,
  26. Consumed.with(Serdes.Integer(), movieSerder));
  27. // 连接至playtimemovies主题的流,每当该主题有事件生成都会被该流所消费
  28. final KStream<String, MoviePlayed> playEvents = builder.stream(
  29. PLAY_MOVIES_TOPIC, Consumed.with(Serdes.String(), moviePlayedSerder));
  30. // PlayedMovies使用区域作为键,对象作为值。我们对内容进行map操作,让电影的id作为key(以便于进行连接)并让对象继续作为值
  31. // 另外,我们使用movies表的键(movieId)以及流的键(在前面的map方法中,我们也将其变成了movieId)进行连接
  32. // 最后,结果会流向控制台
  33. playEvents
  34. .map((key, value) -> KeyValue.pair(value.id, value)) // Now key is the id field
  35. .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
  36. .print(Printed.toSysOut());
  37. return builder.build();
  38. }
  39. }

MovieMoviePlayed POJO包含了实现逻辑所需的属性:

Movie对象如下所示:

  1. package org.acme;
  2. public class Movie {
  3. public int id;
  4. public String name;
  5. public String director;
  6. public String genre;
  7. public Movie(int id, String name, String director, String genre) {
  8. this.id = id;
  9. this.name = name;
  10. this.director = director;
  11. this.genre = genre;
  12. }
  13. }

MoviePlayed对象如下所示:

  1. package org.acme;
  2. public class MoviePlayed {
  3. public int id;
  4. public long duration;
  5. public MoviePlayed(int id, long duration) {
  6. this.id = id;
  7. this.duration = duration;
  8. }
  9. }

运行Kafka Stream应用之前的最后一步是配置参数,其中最重要的是quarkus.kafka-streams.topics。它是一个主题列表,在拓扑结构开始处理数据之前,它们就要存在于Kafka集群中,这是一个前提条件。

打开src/main/resources/application.properties文件并添加如下的代码行:

  1. kafka-streams.cache.max.bytes.buffering=10240
  2. kafka-streams.commit.interval.ms=1000
  3. kafka-streams.metadata.max.age.ms=500
  4. kafka-streams.auto.offset.reset=earliest
  5. kafka-streams.metrics.recording.level=DEBUG
  6. quarkus.kafka-streams.topics=playtimemovies,movies

现在,我们可以测试一下流了。我们启动在上一篇文章中开发的生产者。生产者的源码可以在这里找到。

Quarkus KStreams集成了Quarkus DevServices。所以,我们不需要启动Kafka集群,也不需要配置它的位置,因为Quarkus Dev模式会处理好所有的事情。我们只需要记住在自己的计算机上要有一个运行中的容器环境即可,比如Podman或其他兼容OCI的工具。

在终端窗口中启动生产者服务:

  1. cd movie-plays-producer
  2. ./mvnw compile quarkus:dev
  3. 2022-04-11 07:49:31,900 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Cruella played for 287 minutes
  4. 2022-04-11 07:49:31,941 INFO [io.quarkus] (Quarkus Main Thread) movie-plays-producer 1.0.0-SNAPSHOT on JVM (powered by Quarkus 2.7.3.Final) started in 4.256s.
  5. 2022-04-11 07:49:31,942 INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
  6. 2022-04-11 07:49:31,943 INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, kafka-client, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, vertx]
  7. 2022-04-11 07:49:32,399 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 162 minutes
  8. 2022-04-11 07:49:32,899 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 255 minutes
  9. 2022-04-11 07:49:33,404 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Sing 2 played for 264 minutes
  10. 2022-04-11 07:49:33,902 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Encanto played for 28 minutes
  11. 2022-04-11 07:49:34,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 137 minutes
  12. 2022-04-11 07:49:34,903 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie Star Trek: First Contact played for 277 minutes
  13. 2022-04-11 07:49:35,402 INFO [org.acm.mov.MovieKafkaGenerator] (executor-thread-0) movie The Hobbit played for 141 minutes

在另一个终端窗口中,启动我们刚刚开发的Kafka Stream代码:

  1. ./mvnw compile quarkus:dev
  2. 2022-04-11 07:54:59,321 INFO [org.apa.kaf.str.pro.int.StreamTask] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] task [1_0] Restored and ready to run
  3. 2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] Restoration took 74 ms for all tasks [1_0]
  4. 2022-04-11 07:54:59,322 INFO [org.apa.kaf.str.pro.int.StreamThread] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-thread [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
  5. 2022-04-11 07:54:59,324 INFO [org.apa.kaf.str.KafkaStreams] (movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea-StreamThread-1) stream-client [movie-plays-kstreams-22c86daa-cd28-4956-9d0d-57b6b282a2ea] State transition from REBALANCING to RUNNING
  6. [KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
  7. [KSTREAM-LEFTJOIN-0000000005]: 2, Movie [director=Jonathan Frakes, genre=Space, id=2, name=Star Trek: First Contact]
  8. [KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
  9. [KSTREAM-LEFTJOIN-0000000005]: 1, Movie [director=Peter Jackson, genre=Fantasy, id=1, name=The Hobbit]
  10. [KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
  11. [KSTREAM-LEFTJOIN-0000000005]: 4, Movie [director=Craig Gillespie, genre=Crime Comedy, id=4, name=Cruella]
  12. [KSTREAM-LEFTJOIN-0000000005]: 3, Movie [director=Jared Bush, genre=Animation, id=3, name=Encanto]
  13. [KSTREAM-LEFTJOIN-0000000005]: 5, Movie [director=Garth Jennings, genre=Jukebox Musical Comedy, id=5, name=Sing 2]

在输出中打印的是事件(连接所生成的结果),其中键是movieId,值是movie本身。我们现在所具有的功能是,每当一个电影停止播放时,Kafka Stream会对其进行处理并以Movie全量信息的形式对其进行展现。

到目前为止,还不算复杂,对于这样的场景,你可能会想我们根本没有必要使用Kafka Streams。但是,我们再加一些需求,这样你就能看到它的强大之处了。

现在,我们不是在用户每次停掉电影的时候都生成事件,而是只对用户观看时间超过10分钟的电影发送事件。

我们可以使用filter方法按照持续时长进行过滤。

  1. playEvents
  2. .filter((region, event) -> event.duration >= 10) // filters by duration
  3. .map((key, value) -> KeyValue.pair(value.id, value))
  4. .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
  5. .print(Printed.toSysOut());

重启应用,我们可以发现观看时长小于10分钟的电影将不会被处理。

我们发现,Kafka Streams有助于代码的整洁性,接下来,我们添加最后的需求。现在,我们对每部电影的播放时长并不感兴趣,而是对每部电影有多少次超过10分钟的播放感兴趣。

到目前为止,对事件的处理都是无状态的,因为事件都是遵循这样的步骤,即接收、处理并发送至sink处理器(也就是发送至一个主题或控制台输出),但是,为了统计某部电影播放的次数,我们需要在内存记住电影被播放了多少次,并且当任意用户再次观看超过10分钟的时候,要对这个统计数字递增一次。此时,事件的处理就要以有状态的方式进行了。

我们需要做的第一件事就是创建一个Java类,以存储电影的名称及其播放的次数。

  1. public class MoviePlayCount {
  2. public String name;
  3. public int count;
  4. public MoviePlayCount aggregate(String name) {
  5. this.name = name;
  6. this.count++;
  7. return this;
  8. }
  9. @Override
  10. public String toString() {
  11. return "MoviePlayCount [count=" + count + ", name=" + name + "]";
  12. }
  13. }

这是一个计数器类,它依然需要两样东西:

关于第一个问题,我们需要使用KeyValueBytesStoreSupplier接口。

  1. public static final String COUNT_MOVIE_STORE = "countMovieStore";
  2. KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(COUNT_MOVIE_STORE);

对于第二个问题,Kafka Streams有一个用来聚合结果的aggregate方法。

在我们的用例中,也就是每部电影播放时长超过10分钟的次数。

  1. // MoviePlayCount可以被序列化和反序列化
  2. ObjectMapperSerde<MoviePlayCount> moviePlayCountSerder = new ObjectMapperSerde<>(MoviePlayCount.class);
  3. // 这是之前的连接操作,其中键是电影id,值是电影
  4. .join(moviesTable, (movieId, moviePlayedId) -> movieId, (moviePlayed, movie) -> movie)
  5. // 根据键对事件进行分组,在本例中,也就是电影的id
  6. .groupByKey(Grouped.with(Serdes.Integer(), movieSerder))
  7. // 聚合方法,如果MoviePlayCount已经创建的话,获取该对象(如果尚未创建的话,会创建该实例)并调用其aggregate方法,以对观看次数进行递增
  8. .aggregate(MoviePlayCount::new,
  9. (movieId, movie, moviePlayCounter) -> moviePlayCounter.aggregate(movie.name),
  10. Materialized.<Integer, MoviePlayCount> as(storeSupplier)
  11. .withKeySerde(Serdes.Integer())
  12. .withValueSerde(moviePlayCountSerder)
  13. )

重启应用,在控制台上将会展示电影播放的次数。

提示:要重启应用,只需在终端输入“s”,应用将会自动重启。

应用重启之后,控制台将会展示每部电影的状态。

  1. [KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=13, name=Cruella]
  2. [KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=11, name=Encanto]
  3. [KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=14, name=Sing 2]
  4. [KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=15, name=Star Trek: First Contact]
  5. [KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=16, name=The Hobbit]
  6. [KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=16, name=Star Trek: First Contact]
  7. [KTABLE-TOSTREAM-0000000011]: 3, MoviePlayCount [count=12, name=Encanto]
  8. [KTABLE-TOSTREAM-0000000011]: 2, MoviePlayCount [count=17, name=Star Trek: First Contact]
  9. [KTABLE-TOSTREAM-0000000011]: 5, MoviePlayCount [count=15, name=Sing 2]
  10. [KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=14, name=Cruella]
  11. [KTABLE-TOSTREAM-0000000011]: 1, MoviePlayCount [count=17, name=The Hobbit]
  12. [KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=15, name=Cruella]
  13. [KTABLE-TOSTREAM-0000000011]: 4, MoviePlayCount [count=16, name=Cruella]

交互式查询

因为我们将sink处理器设置成了System.out流,所以聚合结果会流向控制台。

  1. .toStream()
  2. .print(Printed.toSysOut());

但是,我们也可以将结果流发往一个Kafka主题:

  1. .to("counter_movies", Produced.with(Serdes.Integer(), moviePlayCountSerder)
  2. );

但是,如果我们感兴趣的不是每次对新事件的反应,而只是想查询特定的电影此时播放的次数,那又该怎么办呢?

Kafka Streams的交互式查询(interactive query)允许我们直接查询底层存储,以获取给定键相关的值。

首先,我们创建一个名为MoviePlayCountData的类来存储查询结果。按照这种方式,我们可以解耦Kafka Streams使用的类与应用中其他部分所使用的类。

  1. public class MoviePlayCountData {
  2. private String name;
  3. private int count;
  4. public MoviePlayCountData(String name, int count) {
  5. this.name = name;
  6. this.count = count;
  7. }
  8. public int getCount() {
  9. return count;
  10. }
  11. public String getName() {
  12. return name;
  13. }
  14. }

现在,创建名为InteractiveQueries的类来实现对状态存储(KeyValueBytesStoreSupplier)的访问并根据电影的id查询它被播放的次数。

  1. import javax.enterprise.context.ApplicationScoped;
  2. import javax.inject.Inject;
  3. import org.apache.kafka.streams.KafkaStreams;
  4. import org.apache.kafka.streams.errors.InvalidStateStoreException;
  5. import org.apache.kafka.streams.state.QueryableStoreTypes;
  6. import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  7. import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
  8. import java.util.Optional;
  9. @ApplicationScoped
  10. public class InteractiveQueries {
  11. @Inject
  12. KafkaStreams streams;
  13. public <MoviePlayCountData> getMoviePlayCountData(int id) {
  14. // 获取状态存储并根据电影id获取播放次数
  15. MoviePlayCount moviePlayCount = getMoviesPlayCount().get(id);
  16. // 如果有结果的话
  17. if (moviePlayCount != null) {
  18. // 将结果包装到MoviePlayCountData中
  19. return Optional.of(new MoviePlayCountData(moviePlayCount.name, moviePlayCount.count));
  20. } else {
  21. return Optional.empty();
  22. }
  23. }
  24. // 获取状态存储
  25. private ReadOnlyKeyValueStore<Integer, MoviePlayCount> getMoviesPlayCount() {
  26. while (true) {
  27. try {
  28. return streams.store(fromNameAndType(TopologyProducer.COUNT_MOVIE_STORE, QueryableStoreTypes.keyValueStore()));
  29. } catch (InvalidStateStoreException e) {
  30. // 忽略之,此时存储尚未就绪
  31. }
  32. }
  33. }
  34. }

现在,我们可以添加一个简单的REST端点来运行该查询。

  1. import java.util.Optional;
  2. import javax.inject.Inject;
  3. import javax.ws.rs.GET;
  4. import javax.ws.rs.Path;
  5. import javax.ws.rs.PathParam;
  6. import javax.ws.rs.core.Response;
  7. import javax.ws.rs.core.Response.Status;
  8. @Path("/movie")
  9. public class MovieCountResource {
  10. // 注入前文的类进行查询
  11. @Inject
  12. InteractiveQueries interactiveQueries;
  13. @GET
  14. @Path("/data/{id}")
  15. public Response movieCountData(@PathParam("id") int id) {
  16. Optional<MoviePlayCountData> moviePlayCountData = interactiveQueries.getMoviePlayCountData(id);
  17. // 根据结果判定返回值还是404
  18. if (moviePlayCountData.isPresent()) {
  19. return Response.ok(moviePlayCountData.get()).build();
  20. } else {
  21. return Response.status(Status.NOT_FOUND.getStatusCode(),
  22. "No data found for movie " + id).build();
  23. }
  24. }
  25. }

Kafka Stream实现的模式如下图所示:

扩展

Kafka Streams应用可以进行扩展,所以流会分布到多个实例中。在这种情况下,每个实例都包含聚合结果的一个子集,所以要想获得总的聚合结果,我们需要通过将REST API重定向到另外的实例来获取其他实例的数据。

Kafka Streams提供了一个API,可以知道要请求的数据是在本地Kafka Streams存储中还是在其他的主机中。

虽然这个过程并不复杂,但已经超出了本文的讨论范围。

结论

到目前为止,我们已经看到,将Quarkus应用连接到Apache Kafka并生产和消费主题中的消息/事件是很容易的。此外,还看到Kafka Streams让我们不仅可以消费Kafka中消息,还能够实时处理它们,进行转换、过滤等操作,例如以同步的方式消费结果数据。 这是一项强大的技术,当需要处理的数据不断变化时,它可以轻松扩展,提供实时的体验。

但是,我们还没有解决该架构的最后一个问题。通常情况下,数据并不是存储在一个地方。电影信息可能存储在关系型数据库中,而电影的播放信息则存储在一个Kafka主题中。那么,该如何保持这两个地方的信息更新,以便Kafka Streams能正确地连接数据呢?

这里有一个缺失的部分,名为Debezium的项目可以帮助我们解决这个问题。我们将用一整篇文章来介绍Debezium和Quarkus,敬请持续关注。

关于作者

Alex Soto

Alex Soto是红帽公司的开发者体验总监。他对Java领域、软件自动化充满热情,他相信开源软件模式。Soto是Manning的《Testing Java Microservices》和O’Reilly的《Quarkus Cookbook》两本书的共同作者,他还是多个开源项目的贡献者。自 2017 年以来,他一直是Java Champion,是国际演讲者和Salle URL 大学的教师。你可以在 Twitter 上关注他(Alex Soto ⚛️),随时了解 Kubernetes 和 Java 领域的动态。

查看英文原文:Kafka Streams and Quarkus: Real-Time Processing Events

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注