CONTACT
お問い合わせ

ApacheKafka概要と動作確認

ApacheKafka概要と動作確認方法を説明する。

目次

構築手順

  1. 概要
  2. 開発構築
  3. 動作確認
  4. 参考
  5. ソースコード
  6. 今後の課題

1.概要

Apache Kafkaは、元々は、Linked-In が開発したPublisher/Subscriber方式の分散メッセージングシステムである。
メッセージの仲介を成すブローカーがKafkaの中枢であり、Apache ZooKeeperで動作するため、クラスタ化により耐障害性を実現している。
メッセージはディスクにファイルとして保存し、ZooKeeperクラスタ内でレプリカを作成するため、データ損失を防ぐ仕組みとなっている。

Producerはブローカーに対してメッセージを送信することで、
ブローカー内のTopicと呼ばれるキューにメッセージが蓄積される。

ConsumerはブローカーからPolling方式でメッセージを取得する為、
Consumerの処理能力(並列度、スループット)に応じて、メッセージを取得でき、
ブローカーがデータ転送量などを意識する必要が無い。
Producerの送信するメッセージが、Consumerの処理能力を超えた場合は、
ブローカーがタスクキューとなりメッセージが蓄積する。

2.開発構築

Scalaのインストール

[root@Alabaster ~]# cd /tmp
[root@Alabaster tmp]# ls
scala-2.12.4.tgz

[root@Alabaster tmp]# tar xvzf scala-2.12.4.tgz
scala-2.12.4/
scala-2.12.4/lib/
scala-2.12.4/lib/scala-swing_2.12-2.0.0.jar
scala-2.12.4/lib/scala-reflect.jar
scala-2.12.4/lib/scala-parser-combinators_2.12-1.0.6.jar
scala-2.12.4/lib/scala-compiler.jar
scala-2.12.4/lib/scala-library.jar
scala-2.12.4/lib/scala-xml_2.12-1.0.6.jar
scala-2.12.4/lib/scalap-2.12.4.jar
scala-2.12.4/lib/jline-2.14.5.jar
scala-2.12.4/bin/
scala-2.12.4/bin/scala
scala-2.12.4/bin/scalac.bat
scala-2.12.4/bin/scala.bat
scala-2.12.4/bin/scalap
scala-2.12.4/bin/scalap.bat
scala-2.12.4/bin/scaladoc.bat
scala-2.12.4/bin/fsc
scala-2.12.4/bin/fsc.bat
scala-2.12.4/bin/scalac
scala-2.12.4/bin/scaladoc
scala-2.12.4/man/
scala-2.12.4/man/man1/
scala-2.12.4/man/man1/scalac.1
scala-2.12.4/man/man1/scaladoc.1
scala-2.12.4/man/man1/fsc.1
scala-2.12.4/man/man1/scala.1
scala-2.12.4/man/man1/scalap.1
scala-2.12.4/doc/
scala-2.12.4/doc/tools/
scala-2.12.4/doc/tools/scala.html
scala-2.12.4/doc/tools/index.html
scala-2.12.4/doc/tools/images/
scala-2.12.4/doc/tools/images/scala_logo.png
scala-2.12.4/doc/tools/images/external.gif
scala-2.12.4/doc/tools/fsc.html
scala-2.12.4/doc/tools/scalac.html
scala-2.12.4/doc/tools/css/
scala-2.12.4/doc/tools/css/style.css
scala-2.12.4/doc/tools/scalap.html
scala-2.12.4/doc/tools/scaladoc.html
scala-2.12.4/doc/License.rtf
scala-2.12.4/doc/licenses/
scala-2.12.4/doc/licenses/bsd_asm.txt
scala-2.12.4/doc/licenses/mit_jquery.txt
scala-2.12.4/doc/licenses/bsd_jline.txt
scala-2.12.4/doc/licenses/mit_sizzle.txt
scala-2.12.4/doc/licenses/mit_tools.tooltip.txt
scala-2.12.4/doc/licenses/apache_jansi.txt
scala-2.12.4/doc/LICENSE.md
scala-2.12.4/doc/README

[root@Alabaster tmp]# mv scala-2.12.4 /usr/local/scala-2.12.4
[root@Alabaster local]# ln -s /usr/local/scala-2.12.4 /usr/local/scala
[root@Alabaster local]# cd /usr/local/scala
[root@Alabaster scala]# cd bin
[root@Alabaster bin]# ls
fsc fsc.bat scala scala.bat scalac scalac.bat scaladoc scaladoc.bat scalap scalap.bat
[root@Alabaster bin]# ./scala -version
Scala code runner version 2.12.4 -- Copyright 2002-2017, LAMP/EPFL and Lightbend, Inc.

ApacheKafkaのインストール

ApacheKafkaのバイナリをダウンロードする。 ※◦Scalaのバージョンによって異なる
  ◎ ApacheKafka公式サイト
  ◎ Apache公式サイト

[root@Alabaster tmp]# tar xvzf kafka_2.12-0.11.0.1.gz
kafka_2.12-0.11.0.1/
kafka_2.12-0.11.0.1/LICENSE
kafka_2.12-0.11.0.1/NOTICE
kafka_2.12-0.11.0.1/bin/
kafka_2.12-0.11.0.1/bin/connect-distributed.sh
kafka_2.12-0.11.0.1/bin/connect-standalone.sh
kafka_2.12-0.11.0.1/bin/kafka-acls.sh
kafka_2.12-0.11.0.1/bin/kafka-broker-api-versions.sh
kafka_2.12-0.11.0.1/bin/kafka-configs.sh
kafka_2.12-0.11.0.1/bin/kafka-console-consumer.sh
kafka_2.12-0.11.0.1/bin/kafka-console-producer.sh
kafka_2.12-0.11.0.1/bin/kafka-consumer-groups.sh
kafka_2.12-0.11.0.1/bin/kafka-consumer-offset-checker.sh
kafka_2.12-0.11.0.1/bin/kafka-consumer-perf-test.sh
kafka_2.12-0.11.0.1/bin/kafka-delete-records.sh
kafka_2.12-0.11.0.1/bin/kafka-mirror-maker.sh
kafka_2.12-0.11.0.1/bin/kafka-preferred-replica-election.sh
kafka_2.12-0.11.0.1/bin/kafka-producer-perf-test.sh
kafka_2.12-0.11.0.1/bin/kafka-reassign-partitions.sh
kafka_2.12-0.11.0.1/bin/kafka-replay-log-producer.sh
kafka_2.12-0.11.0.1/bin/kafka-replica-verification.sh
kafka_2.12-0.11.0.1/bin/kafka-run-class.sh
kafka_2.12-0.11.0.1/bin/kafka-server-start.sh
kafka_2.12-0.11.0.1/bin/kafka-server-stop.sh
kafka_2.12-0.11.0.1/bin/kafka-simple-consumer-shell.sh
kafka_2.12-0.11.0.1/bin/kafka-streams-application-reset.sh
kafka_2.12-0.11.0.1/bin/kafka-topics.sh
kafka_2.12-0.11.0.1/bin/kafka-verifiable-consumer.sh
kafka_2.12-0.11.0.1/bin/kafka-verifiable-producer.sh
kafka_2.12-0.11.0.1/bin/windows/
kafka_2.12-0.11.0.1/bin/windows/connect-distributed.bat
kafka_2.12-0.11.0.1/bin/windows/connect-standalone.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-acls.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-broker-api-versions.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-configs.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-console-consumer.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-console-producer.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-consumer-groups.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-consumer-offset-checker.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-consumer-perf-test.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-mirror-maker.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-preferred-replica-election.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-producer-perf-test.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-reassign-partitions.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-replay-log-producer.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-replica-verification.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-run-class.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-server-start.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-server-stop.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-simple-consumer-shell.bat
kafka_2.12-0.11.0.1/bin/windows/kafka-topics.bat
kafka_2.12-0.11.0.1/bin/windows/zookeeper-server-start.bat
kafka_2.12-0.11.0.1/bin/windows/zookeeper-server-stop.bat
kafka_2.12-0.11.0.1/bin/windows/zookeeper-shell.bat
kafka_2.12-0.11.0.1/bin/zookeeper-security-migration.sh
kafka_2.12-0.11.0.1/bin/zookeeper-server-start.sh
kafka_2.12-0.11.0.1/bin/zookeeper-server-stop.sh
kafka_2.12-0.11.0.1/bin/zookeeper-shell.sh
kafka_2.12-0.11.0.1/config/
kafka_2.12-0.11.0.1/config/connect-console-sink.properties
kafka_2.12-0.11.0.1/config/connect-console-source.properties
kafka_2.12-0.11.0.1/config/connect-distributed.properties
kafka_2.12-0.11.0.1/config/connect-file-sink.properties
kafka_2.12-0.11.0.1/config/connect-file-source.properties
kafka_2.12-0.11.0.1/config/connect-log4j.properties
kafka_2.12-0.11.0.1/config/connect-standalone.properties
kafka_2.12-0.11.0.1/config/consumer.properties
kafka_2.12-0.11.0.1/config/log4j.properties
kafka_2.12-0.11.0.1/config/producer.properties
kafka_2.12-0.11.0.1/config/server.properties
kafka_2.12-0.11.0.1/config/tools-log4j.properties
kafka_2.12-0.11.0.1/config/zookeeper.properties
kafka_2.12-0.11.0.1/libs/
kafka_2.12-0.11.0.1/libs/kafka-clients-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/jopt-simple-5.0.3.jar
kafka_2.12-0.11.0.1/libs/metrics-core-2.2.0.jar
kafka_2.12-0.11.0.1/libs/scala-library-2.12.2.jar
kafka_2.12-0.11.0.1/libs/slf4j-log4j12-1.7.25.jar
kafka_2.12-0.11.0.1/libs/zkclient-0.10.jar
kafka_2.12-0.11.0.1/libs/zookeeper-3.4.10.jar
kafka_2.12-0.11.0.1/libs/scala-parser-combinators_2.12-1.0.4.jar
kafka_2.12-0.11.0.1/libs/lz4-1.3.0.jar
kafka_2.12-0.11.0.1/libs/snappy-java-1.1.2.6.jar
kafka_2.12-0.11.0.1/libs/slf4j-api-1.7.25.jar
kafka_2.12-0.11.0.1/libs/log4j-1.2.17.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-sources.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-sources.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-javadoc.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-javadoc.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-test.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-test.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-test-sources.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-test-sources.jar.asc
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-scaladoc.jar
kafka_2.12-0.11.0.1/libs/kafka_2.12-0.11.0.1-scaladoc.jar.asc
kafka_2.12-0.11.0.1/site-docs/
kafka_2.12-0.11.0.1/site-docs/kafka_2.12-0.11.0.1-site-docs.tgz
kafka_2.12-0.11.0.1/libs/kafka-tools-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/kafka-log4j-appender-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/argparse4j-0.7.0.jar
kafka_2.12-0.11.0.1/libs/jackson-databind-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jackson-annotations-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jackson-core-2.8.5.jar
kafka_2.12-0.11.0.1/libs/connect-api-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/connect-runtime-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/connect-transforms-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/jackson-jaxrs-json-provider-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jersey-container-servlet-2.24.jar
kafka_2.12-0.11.0.1/libs/jetty-server-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-servlet-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-servlets-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/reflections-0.9.11.jar
kafka_2.12-0.11.0.1/libs/maven-artifact-3.5.0.jar
kafka_2.12-0.11.0.1/libs/jackson-jaxrs-base-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jackson-module-jaxb-annotations-2.8.5.jar
kafka_2.12-0.11.0.1/libs/jersey-container-servlet-core-2.24.jar
kafka_2.12-0.11.0.1/libs/jersey-common-2.24.jar
kafka_2.12-0.11.0.1/libs/jersey-server-2.24.jar
kafka_2.12-0.11.0.1/libs/javax.ws.rs-api-2.0.1.jar
kafka_2.12-0.11.0.1/libs/javax.servlet-api-3.1.0.jar
kafka_2.12-0.11.0.1/libs/jetty-http-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-io-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-security-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-continuation-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/jetty-util-9.2.15.v20160210.jar
kafka_2.12-0.11.0.1/libs/guava-20.0.jar
kafka_2.12-0.11.0.1/libs/javassist-3.21.0-GA.jar
kafka_2.12-0.11.0.1/libs/plexus-utils-3.0.24.jar
kafka_2.12-0.11.0.1/libs/commons-lang3-3.5.jar
kafka_2.12-0.11.0.1/libs/javax.inject-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/javax.annotation-api-1.2.jar
kafka_2.12-0.11.0.1/libs/jersey-guava-2.24.jar
kafka_2.12-0.11.0.1/libs/hk2-api-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/hk2-locator-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/osgi-resource-locator-1.0.1.jar
kafka_2.12-0.11.0.1/libs/jersey-client-2.24.jar
kafka_2.12-0.11.0.1/libs/jersey-media-jaxb-2.24.jar
kafka_2.12-0.11.0.1/libs/validation-api-1.1.0.Final.jar
kafka_2.12-0.11.0.1/libs/hk2-utils-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/aopalliance-repackaged-2.5.0-b05.jar
kafka_2.12-0.11.0.1/libs/javax.inject-1.jar
kafka_2.12-0.11.0.1/libs/connect-json-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/connect-file-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/kafka-streams-0.11.0.1.jar
kafka_2.12-0.11.0.1/libs/rocksdbjni-5.0.1.jar
kafka_2.12-0.11.0.1/libs/kafka-streams-examples-0.11.0.1.jar

[root@Alabaster tmp]# mv kafka_2.12-0.11.0.1 /usr/local/
[root@Alabaster local]# ln -s /usr/local/kafka_2.12-0.11.0.1 /usr/local/kafka

3.動作確認

Zookeeperの起動

[root@Alabaster bin]# pwd
/usr/local/kafka/bin
[root@Alabaster bin]# ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties

Kafkaの実行

[root@Alabaster bin]# ./kafka-server-start.sh -daemon ../config/server.properties

Topicの作成(test01と言う名前のTopicを作成)

[root@Alabaster bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test01
Created topic "test01".

[root@Alabaster bin]# ./kafka-topics.sh --list --zookeeper localhost:2181
test01

動作確認(メッセージの送受信:Kafka同梱コマンド)

  ◎ メッセージの送信

[root@Alabaster bin]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test01
>aaaa
>dddd
>exit
>quit
<Ctrol+C>

  ◎ メッセージの受信

[root@Alabaster bin]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test01 --from-beginning
aaaa
dddd
exit
quit
^C Processed a total of 4 messages

メッセージの送受信アプリケーション:Java版

Producerからメッセージの投げ込み

[root@Alabaster tmp]# java -cp XXXXXX jp.a_frontier.kafka.sample.SampleProducer ABC 7
[main]Start
    msg=[ABC]
    count=7
[Start]コンストラクタ
[Start]configure()
[End]configure()
[End]コンストラクタ
msg=ABC
messageCount=7
ProducerRecord[Maked]
i=0
i=1
i=2
i=3
i=4
i=5
i=6
for[End]
flush[End]
elapsedTime=474
close[End]
[main]End

Consumerが当該TOPICからメッセージを取得表示

[root@Alabaster ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test01
ABC0
ABC1
ABC2
ABC3
ABC4
ABC5
ABC6
^CProcessed a total of 7 messages

Consumerでメッセージの取得

[root@Alabaster ~]# java -cp XXXXX jp.a_frontier.kafka.sample.SimpleConsumer
run():Topic=test01
run():subscribe
run():records
Received message: (null, aaaa) at offset 0
Received message: (null, dddd) at offset 1
Received message: (null, exit) at offset 2
Received message: (null, quit) at offset 3
Received message: (null, abcd) at offset 4
Received message: (null, dddd) at offset 5
Received message: (null, dddddd) at offset 6
Received message: (null, fdfe) at offset 7
Received message: (null, sdfsafe) at offset 8
Received message: (null, kfdsajfie) at offset 9
Received message: (null, a) at offset 10
Received message: (null, a) at offset 11
  ・
  ・
  ・

4.参考

  ◎ kafka 0.11.0.1 APIのjavadoc

[root@Alabaster ~]# /usr/local/kafka/bin/kafka-console-consumer.sh
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option                     Description
------                    -----------
'--blacklist <String: blacklist>       Blacklist of topics to exclude from
                          consumption.
'--bootstrap-server <String: server to  REQUIRED (unless old consumer is
connect to>                  used): The server to connect to.
'--consumer-property <String:       A mechanism to pass user-defined
consumer_prop>                properties in the form key=value to
                          the consumer.
'--consumer.config <String: config file> Consumer config properties file. Note
                           that [consumer-property] takes
                           precedence over this config.
'--csv-reporter-enabled           If set, the CSV metrics reporter will
                           be enabled
'--delete-consumer-offsets        If specified, the consumer path in
                          zookeeper is deleted when starting up
'--enable-systest-events          Log lifecycle events of the consumer
                           in addition to logging consumed
                          messages. (This is specific for
                          system tests.)
'--formatter <String: class>        The name of a class to use for
                          formatting kafka messages for
                          display. (default: kafka.tools.
                          DefaultMessageFormatter)
'--from-beginning               If the consumer does not already have
                          an established offset to consume
                          from, start with the earliest
                          message present in the log rather
                          than the latest message.
'--isolation-level <String>         Set to read_committed in order to
                          filter out transactional messages
                          which are not committed. Set to
                          read_uncommittedto read all
                          messages. (default: read_uncommitted)
'--key-deserializer <String:
deserializer for key>
'--max-messages <Integer: num_messages> The maximum number of messages to
                           consume before exiting. If not set,
                           consumption is continual.
'--metrics-dir <String: metrics        If csv-reporter-enable is set, and
directory>                     this parameter isset, the csv
                            metrics will be output here
'--new-consumer                Use the new consumer implementation.
                            This is the default.
'--offset <String: consume offset>     The offset id to consume from (a non-
                            negative number), or 'earliest'
                            which means from beginning, or
                            'latest' which means from end
                            (default: latest)
'--partition <Integer: partition>        The partition to consume from.
                            Consumption starts from the end of
                            the partition unless '--offset' is
                            specified.
'--property <String: prop>            The properties to initialize the
                            message formatter.
'--skip-message-on-error           If there is an error when processing a
                            message, skip it instead of halt.
'--timeout-ms <Integer: timeout_ms>    If specified, exit if no message is
                            available for consumption for the
                            specified interval.
'--topic <String: topic>             The topic id to consume on.
'--value-deserializer <String:
deserializer for values>
'--whitelist <String: whitelist>         Whitelist of topics to include for
                            consumption.
'--zookeeper <String: urls>           REQUIRED (only when using old
                            consumer): The connection string for
                            the zookeeper connection in the form
                            host:port. Multiple URLS can be
                            given to allow fail-over.

5.ソースコード

  ◎ sampleproducer.java
  ◎ simpleconsumer.java

6.今後の課題

  ◎ Trifectaによるメッセージの可視化
  ◎ Topic分割、クラスタリング単位の最適設計のポイント
  ◎ Zookeeprの複数台クラスタリングによる動作性能
  ◎ 運用監視項目
  ◎ Nginx→Tomcat→Kafka→Storm連携