[关闭]
@yangsp1 2016-08-02T06:43:00.000000Z 字数 94484 阅读 2725

kafka翻译

房源业务


作者:老杨叔叔
时间:2016-07-22

.目录

英文版

api doc

0.相关资料

代理服务器集群搭建

环境搭建另一

各种MQ比较

1.入门

kafka使用这些简单的配置文件启动,并创建两个connectors:第一个connector是一个从文件中按行读取数据,并把数据发布到主题的数据源connecotr。第二个是一个从主题消费消息,并把消息存到文件里的接收Connector。在启动过程中,你会看到许多日志信息,包括connetors实例化的一些指标信息。一旦kafka connect 进程启动,数据源connector就会开始从text.txt文件中读取数据。并且把数据发到connect-test主题。然后接收者connector开始从connect-test主题中读取数据,并且把数据写到test.sink.txt文件中。
我们可以检查test.sink.txt文件,来验证数据是否已经被传递到事个管道中。
```

cat test.sink.txt
foo
bar

注意:数据正在被存储到
connect-test主题中,因此我们可以运行一个命令行消费者来查看主题中的数据。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

我们可以向源文件中添加数据来检查数据在管道中的流动:

echo "Another line" >> test.txt
```
你是不是惊奇的发现,"Another line"这条数据已经保存到test.sink.txt文件中了。我的天哪,这太神奇了。

### 步骤8: 使用 kafka Streams处理数据
kafka stream是一个处理实时的流式操作和数据分析的客户端库。 下面的快速入门示例将演示如何使用这个库去运行一个流式应用代码。下面是WordCountDemo示例的关键代码(使用的是java8 lamda表达式)。

```
KTable wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

// Ensure the words are available as record keys for the next aggregate operation.
.map((key, value) -> new KeyValue<>(value, value))

// Count the occurrences of each word (record key) and store the results into a table named "Counts".
.countByKey("Counts")

```

它实现了 WordCount(单词计数)算法,用于计算一个单词在文本中出现的次数。但是跟其它你以前见过的WordCount算法有些许不同。因为它是被设计用来处理无限的流式数据。它是一个跟踪和更新单词计数的动态算法。由于必须假定输入数据是无限的,并且无法确定什么时候处理完“所有”数据,因此它会定期输入出当前状态和处理结果。

现在,我们准备向kafka 主题中输入一些数据,用于kafka Streams应用程序后续数据。

```
> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt
```

接着,我们把输入的数据通过命令行生产者发送到```streams-file-input```主题中(在实践中,流式数据会源源不断的发布到kafka中)。
```
  > bin/kafka-topics.sh --create \
              --zookeeper localhost:2181 \
              --replication-factor 1 \
              --partitions 1 \
              --topic streams-file-input
  > cat file-input.txt | bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input

```

现在,我们可以启动WordCount示例应用来处理输入的数据。

```
  > bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

```
因为结果被写回到另一个主题,因此,除了日志条件,没有任何内容输出到“标准输出”。不像典型的流处理程序,示例代码运行几秒后会自动停止。

我们现在可以通过读取输出主题的消息,来检查WordCount示例输出的数据:
```
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
        --topic streams-wordcount-output \
        --from-beginning \
        --formatter kafka.tools.DefaultMessageFormatter \
        --property print.key=true \
        --property print.value=true \
        --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
        --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

```
程序运行后,会在命令行输出以下数据:
```
    all     1
    streams 1
    lead    1
    to      1
    kafka   1
    hello   1
    kafka   2
    streams 2
    join    1
    kafka   3
    summit  1
```
第一列是消息的key,第二列是消息的值。两列都使用了java.lang.String格式。注意,那个“输出”是一个持续的更新流。其中每条记录(原输出中的每一行)是一个单词的更新统计。对具有相同key的多条记录,以后的每条统计记录都是前一次的更新。

现在,你可以向streams-file-input主题中写入更多的数据,你会发现,添加的数据会追加到“streams-wordcount-output”主题,并且体现到单词计数记录中(上述操作,可以通过命令行生产者和消费者来观察)

2. API

apache kafka包含一个新的java客户端(位于org.apache.kafka.clients包中)。新的java客户端是为了替换旧的Scala客户端。但是一段时间内两个客户端会共存。同时这些客户端可以以单独的jar存在,而旧的Scala客户端仍然打包在server中。

3. 配置

kafka使用基于property文件格式的键值对配置程序。这些键值对即可以来自property文件,也可以来自编程方式。

更多关于broker的配置细节,可以到类kafka.server.KafkaConfig中查看。

有关主题的配置,即可以使用全局默认值,也可以每个主题单独配置。如果给定的主题没有配置,那么将使用全局默认配置。创建主题时,可以通过给定配置参数,覆盖全局默认配置。下面的示例就是创建一个命名为“my-topic”的主题,并且设置消息最大大小和刷新频率。

  1. > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
  2. --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

除了创建主题时设置配置,通过--alter命令也可以随时修改主题配置。如下,设置主题my-topic的最大消息大小:

  1. > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
  2. --config max.message.bytes=128000
  也可以通过--delete-config来删除主题定制的配置,恢复到全局默认配置。如下:
  1. > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
  2. --delete-config max.message.bytes

以下是主题级别的配置。服务器级的默认配置是全局配置。可以设置这个值用于配置没有自定义的主题。具体在“服务器配置属性列”
属性:主题属性名;
默认值:由服务器默认值指定。
服务器默认值:用于配置全局默认值。可修改

属性 默认值 服务器默认值 说明
cleanup.policy delete log.cleanup.policy A string that is either "delete" or "compact". This string designates the retention policy to use on old log segments. The default policy ("delete") will discard old segments when their retention time or size limit has been reached. The "compact" setting will enable log compaction on the topic.
delete.retention.ms 86400000 (24 hours) log.cleaner.delete.retention.ms The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).
flush.messages None log.flush.interval.messages This setting allows specifying an interval at which we will force an fsync of data written to the log. For example if this was set to 1 we would fsync after every message; if it were 5 we would fsync after every five messages. In general we recommend you not set this and use replication for durability and allow the operating system's background flush capabilities as it is more efficient. This setting can be overridden on a per-topic basis (see the per-topic configuration section).
flush.ms None log.flush.interval.ms This setting allows specifying a time interval at which we will force an fsync of data written to the log. For example if this was set to 1000 we would fsync after 1000 ms had passed. In general we recommend you not set this and use replication for durability and allow the operating system's background flush capabilities as it is more efficient.
index.interval.bytes 4096 log.index.interval.bytes This setting controls how frequently Kafka adds an index entry to it's offset index. The default setting ensures that we index a message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact position in the log but makes the index larger. You probably don't need to change this.
max.message.bytes 1,000,000 message.max.bytes This is largest message size Kafka will allow to be appended to this topic. Note that if you increase this size you must also increase your consumer's fetch size so they can fetch messages this large.
min.cleanable.dirty.ratio 0.5 log.cleaner.min.cleanable.ratio This configuration controls how frequently the log compactor will attempt to clean the log (assuming log compaction is enabled). By default we will avoid cleaning a log where more than 50% of the log has been compacted. This ratio bounds the maximum space wasted in the log by duplicates (at 50% at most 50% of the log could be duplicates). A higher ratio will mean fewer, more efficient cleanings but will mean more wasted space in the log.
min.insync.replicas 1 min.insync.replicas When a producer sets acks to "all", min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of "all". This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
retention.bytes None log.retention.bytes This configuration controls the maximum size a log can grow to before we will discard old log segments to free up space if we are using the "delete" retention policy. By default there is no size limit only a time limit.
retention.ms 7 days log.retention.minutes This configuration controls the maximum time we will retain a log before we will discard old log segments to free up space if we are using the "delete" retention policy. This represents an SLA on how soon consumers must read their data.
segment.bytes 1 GB log.segment.bytes This configuration controls the segment file size for the log. Retention and cleaning is always done a file at a time so a larger segment size means fewer files but less granular control over retention.
segment.index.bytes 10 MB log.index.size.max.bytes This configuration controls the size of the index that maps offsets to file positions. We preallocate this index file and shrink it only after log rolls. You generally should not need to change this setting.
segment.ms 7 days log.roll.hours This configuration controls the period of time after which Kafka will force the log to roll even if the segment file isn't full to ensure that retention can delete or compact old data.
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.
Name Description Type Default Valid Values Importance
bootstrap.servers 用于客户端向服务器建立初始连接的kafka broker ip及端口集。与producer中配置相同。 list high
key.deserializer 实现了接口Deserializer 的类,用于反序列化key.与producer中的key.serialzer对应。 class high
value.deserializer 实现了接口Deserializer 的类,用于反序列化value.与producer中的value.serialzer对应。 class high
fetch.min.bytes 批量读,数据量最小限止。 一次拉取请求返回给客户端的最小数据量。如果数据小于此值,服务器不会立即返回请求,而是等待更多数据,至到超过此值。 默认值为1字节,也就是说每次请求都会立即返回。此配置可以提高吞吐量,不过会增加延迟。 int 1 [0,...] high
group.id 一个标识多个consumer为一组,且为字符串类型的唯一值。对于集群环境此值非常有用,只有集群中一台机器可以消费,防止重复消费。 主题决定了哪些消费者可以消费。group.id决定了同一个主题的多个消费者同一消费只能被一个消费(互斥)。 A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy. string "" high
heartbeat.interval.ms The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances. int 3000 high
max.partition.fetch.bytes The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be #partitions * max.partition.fetch.bytes.This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition. int 1048576 [0,...] high
session.timeout.ms The timeout used to detect failures when using Kafka's group management facilities. When a consumer's heartbeat is not received within the session timeout, the broker will mark the consumer as failed and rebalance the group. Since heartbeats are sent only when poll() is invoked, a higher session timeout allows more time for message processing in the consumer's poll loop at the cost of a longer time to detect hard failures.See also max.poll.records for another option to control the processing time in the poll loop. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms. int 30000 high
ssl.key.password The password of the private key in the key store file. This is optional for client. password null high
ssl.keystore.location The location of the key store file. This is optional for client and can be used for two-way authentication for client. string null high
ssl.keystore.password The store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. password null high
ssl.truststore.location The location of the trust store file. string null high
ssl.truststore.password The password for the trust store file. password null high
auto.offset.reset What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.
string latest [latest, earliest, none] medium
connections.max.idle.ms Close idle connections after the number of milliseconds specified by this config. long 540000 medium
enable.auto.commit 偏移量是由consumer维护,并不代表是存储在本客户端(客户端无处可存,且对于多个group消费者无法实现互斥消费),实际上是存储在zookeeper。 因此offset可以由服务器代为维护。 boolean true medium
exclude.internal.topics Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to true the only way to receive records from an internal topic is subscribing to it. boolean true medium
max.poll.records 一次请求返回批量数据的上限。一次拉取,最多返回的消息条数。 此数据默认值过大,生产环境最好设置一个更小的值(我自己瞎猜的)。 int 2147483647 [1,...] medium
partition.assignment.strategy The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used list [org.apache.kafka.clients.consumer.RangeAssignor] medium
receive.buffer.bytes 用于接收消费的TCP缓冲大小. int 65536 [0,...] medium
request.timeout.ms 请求超时时间。超过时间,客户端会失败重发(客户端重发在哪里配置?),或者重发过多时失败。 int 40000 [0,...] medium
sasl.kerberos.service.name The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. string null medium
sasl.mechanism SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. GSSAPI is the default mechanism. string GSSAPI medium
security.protocol Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string PLAINTEXT medium
send.buffer.bytes 发送数据的TCP缓冲大小。 int 131072 [0,...] medium
ssl.enabled.protocols The list of protocols enabled for SSL connections. list [TLSv1.2, TLSv1.1, TLSv1] medium
ssl.keystore.type The file format of the key store file. This is optional for client. string JKS medium
ssl.protocol The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. string TLS medium
ssl.provider The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. string null medium
ssl.truststore.type The file format of the trust store file. string JKS medium
auto.commit.interval.ms 如果 enable.auto.commit 设置为 true,那么此值为自动提交的频率(单位:毫秒)。 即:每auto.commit.interval.ms自动提交一次offsets。 long 5000 [0,...] low
check.crcs Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. boolean true low
client.id 一个发送到服务器的id。目的是通过一个有意义的值用于跟踪请求源,而不仅仅通过ip/port来跟踪。如:使用应用名。 string "" low
fetch.max.wait.ms The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes. int 500 [0,...] low
interceptor.classes 消费者拦截器列表。实现了ConsumerInterceptor 接口的类允许拦截消费者收到的所有消息(可能对消息做统一修改)。默认情况下,没有配置任何拦截器。 list null low
metadata.max.age.ms The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. long 300000 [0,...] low
metric.reporters A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. list [] low
metrics.num.samples The number of samples maintained to compute metrics. int 2 [1,...] low
metrics.sample.window.ms The window of time a metrics sample is computed over. long 30000 [0,...] low
reconnect.backoff.ms 链接失败重试,间隔时间。当链接失败,间隔一定时间后再重试,防止短时间内,死循环式快速重试。 此配置对从当前consumer发出的所有请示有效。 long 50 [0,...] low
retry.backoff.ms 失败重试,间隔时间。当请求失败时,间隔一定时间后再失败重试。防止短时间内,由于一些错误原因,导致死循环式快速失败重发。 long 100 [0,...] low
sasl.kerberos.kinit.cmd Kerberos kinit command path. string /usr/bin/kinit low
sasl.kerberos.min.time.before.relogin Login thread sleep time between refresh attempts. long 60000 low
sasl.kerberos.ticket.renew.jitter Percentage of random jitter added to the renewal time. double 0.05 low
sasl.kerberos.ticket.renew.window.factor Login thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket. double 0.8 low
ssl.cipher.suites A list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported. list null low
ssl.endpoint.identification.algorithm The endpoint identification algorithm to validate server hostname using server certificate. string null low
ssl.keymanager.algorithm The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine. string SunX509 low
ssl.trustmanager.algorithm The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine. string PKIX low

4. 设计

5. 实现

6. 操作

这里有一些LinkedIn公司在生产系统的使用经验。

7.安全

8.kafka Connect

9. kafka Streams

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注