リアルタイム向けに構築:Apache Kafkaによるビッグデータメッセージング、パート1

ビッグデータの移動が始まったとき、それは主にバッチ処理に焦点を合わせていました。 MapReduce、Hive、Pigなどの分散データストレージおよびクエリツールはすべて、データを継続的にではなくバッチで処理するように設計されています。企業は毎晩複数のジョブを実行してデータベースからデータを抽出し、分析、変換、そして最終的にはデータを保存していました。最近、企業は、数時間に1回だけでなく、発生したデータやイベントを分析および処理する能力を発見しました。ただし、ほとんどの従来のメッセージングシステムは、ビッグデータをリアルタイムで処理するようにスケールアップしていません。そのため、LinkedInのエンジニアは、コモディティハードウェアを拡張することでビッグデータの需要を満たす分散メッセージングフレームワークであるApacheKafkaを構築してオープンソース化しました。

過去数年にわたって、ApacheKafkaはさまざまなユースケースを解決するために登場しました。最も単純なケースでは、アプリケーションログを保存するための単純なバッファである可能性があります。 Spark Streamingなどのテクノロジーと組み合わせると、データの変更を追跡し、最終的な宛先に保存する前にそのデータに対してアクションを実行するために使用できます。 Kafkaの予測モードは、クレジットカード取引が発生したときにその有効性を確認したり、数時間後にバッチ処理を待たなかったりするなど、不正を検出するための強力なツールになります。

この2部構成のチュートリアルでは、Kafkaを紹介し、開発環境にインストールして実行する方法から始めます。Kafkaのアーキテクチャの概要を説明した後、すぐに使用できるApacheKafkaメッセージングシステムの開発について紹介します。最後に、Kafkaサーバーを介してメッセージを送信および消費するカスタムのプロデューサー/コンシューマーアプリケーションを構築します。チュートリアルの後半では、メッセージを分割してグループ化する方法と、Kafkaコンシューマーが消費するメッセージを制御する方法を学習します。

Apache Kafkaとは何ですか?

Apache Kafkaは、ビッグデータに合わせて拡張できるように構築されたメッセージングシステムです。Apache ActiveMQまたはRabbitMqと同様に、Kafkaを使用すると、さまざまなプラットフォームで構築されたアプリケーションが非同期メッセージパッシングを介して通信できます。しかし、Kafkaは、これらの従来のメッセージングシステムとは重要な点で異なります。

  • コモディティサーバーを追加することで、水平方向に拡張できるように設計されています。
  • これは、プロデューサープロセスとコンシューマープロセスの両方にはるかに高いスループットを提供します。
  • バッチとリアルタイムの両方のユースケースをサポートするために使用できます。
  • Javaのメッセージ指向ミドルウェアAPIであるJMSはサポートしていません。

ApacheKafkaのアーキテクチャ

Kafkaのアーキテクチャを検討する前に、その基本的な用語を知っておく必要があります。

  • プロデューサーは、トピックにメッセージを公開することができますプロセスです。
  • 消費者は、 1つのまたは複数のトピックをサブスクライブして、トピックにパブリッシュされたメッセージを消費することができるプロセスです。
  • トピックのカテゴリは、メッセージが公開されたフィードの名前です。
  • ブローカーは、単一のマシン上で実行されるプロセスです。
  • クラスタは、一緒に仕事ブローカーのグループです。

Apache Kafkaのアーキテクチャは非常に単純であるため、一部のシステムではパフォーマンスとスループットが向上する可能性があります。Kafkaのすべてのトピックは、単純なログファイルのようなものです。プロデューサーがメッセージを公開すると、Kafkaサーバーはそのメッセージを特定のトピックのログファイルの最後に追加します。サーバーは、各メッセージを永続的に識別するために使用される番号であるオフセットも割り当てます。メッセージの数が増えると、各オフセットの値が増加します。たとえば、プロデューサーが3つのメッセージを公開する場合、最初のメッセージは1のオフセット、2番目は2のオフセット、3番目は3のオフセットを取得する可能性があります。

Kafkaコンシューマーが最初に起動すると、サーバーにプルリクエストが送信され、オフセット値が0より大きい特定のトピックのメッセージを取得するように求められます。サーバーはそのトピックのログファイルをチェックし、3つの新しいメッセージを返します。 。コンシューマーはメッセージを処理してから、オフセットが3より大きいメッセージの要求を送信します。

Kafkaでは、クライアントはオフセットカウントの記憶とメッセージの取得を担当します。Kafkaサーバーはメッセージの消費を追跡または管理しません。デフォルトでは、Kafkaサーバーは7日間メッセージを保持します。サーバーのバックグラウンドスレッドは、7日以上経過したメッセージをチェックして削除します。コンシューマーは、サーバー上にある限り、メッセージにアクセスできます。メッセージを複数回読み取ることができ、受信と逆の順序でメッセージを読み取ることもできます。しかし、消費者が7日が経過する前にメッセージを取得できなかった場合、そのメッセージを見逃してしまいます。

Kafkaベンチマーク

LinkedInや他の企業による本番環境での使用は、適切な構成でApacheKafkaが毎日数百ギガバイトのデータを処理できることを示しています。2011年、3人のLinkedInエンジニアがベンチマークテストを使用して、KafkaがActiveMQやRabbitMQよりもはるかに高いスループットを達成できることを実証しました。

ApacheKafkaのクイックセットアップとデモ

このチュートリアルではカスタムアプリケーションを作成しますが、すぐに使用できるプロデューサーとコンシューマーを使用してKafkaインスタンスをインストールしてテストすることから始めましょう。

  1. Kafkaダウンロードページにアクセスして、最新バージョン(この記事の執筆時点では0.9)をインストールしてください。
  2. バイナリをsoftware/kafkaフォルダに抽出します。現在のバージョンでは、software/kafka_2.11-0.9.0.0です。
  3. 新しいフォルダを指すように現在のディレクトリを変更します。
  4. 次のコマンドを実行して、Zookeeperサーバーを起動しますbin/zookeeper-server-start.sh config/zookeeper.properties
  5. 以下を実行して、Kafkaサーバーを起動しますbin/kafka-server-start.sh config/server.properties
  6. テストに使用できるテストトピックを作成しますbin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld
  7. 以下のような特定のトピックにパブリッシュされたメッセージを、消費することができるシンプルなコンソール消費者を起動しますjavaworldbin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning
  8. テストトピックにメッセージを公開できる単純なプロデューサーコンソールを起動しますbin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld
  9. プロデューサーコンソールに1つまたは2つのメッセージを入力してみてください。メッセージはコンシューマコンソールに表示されます。

ApacheKafkaを使用したアプリケーションの例

ApacheKafkaが箱から出してどのように機能するかを見てきました。次に、カスタムの生産者/消費者アプリケーションを開発しましょう。プロデューサーはコンソールからユーザー入力を取得し、新しい各行をメッセージとしてKafkaサーバーに送信します。コンシューマーは、特定のトピックのメッセージを取得して、コンソールに出力します。この場合、生産者と消費者のコンポーネントは、独自の実装であるkafka-console-producer.shkafka-console-consumer.sh

Producer.javaクラスを作成することから始めましょう。このクライアントクラスには、コンソールからユーザー入力を読み取り、その入力をメッセージとしてKafkaサーバーに送信するロジックが含まれています。

We configure the producer by creating an object from the java.util.Properties class and setting its properties. The ProducerConfig class defines all the different properties available, but Kafka's default values are sufficient for most uses. For the default config we only need to set three mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker's host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It's a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka's client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn't have a message-specific key, so we'll use null for the key. For the value we'll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn't need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

Custom key/value objects

Similar to StringSerializer, Kafka provides serializers for other primitives such as int and long. In order to use a custom object for our key or value, we would need to create a class implementing org.apache.kafka.common.serialization.Serializer. We could then add logic to serialize the class into byte[]. We would also have to use a corresponding deserializer in our consumer code.

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, line); producer.send(rec); line = in.nextLine(); } in.close(); producer.close(); } } 

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

サンプルアプリケーションの場合、プロデューサーがByteArraySerializerキーとStringSerializer値に使用していることがわかります。したがって、クライアント側ではorg.apache.kafka.common.serialization.ByteArrayDeserializer、キーとorg.apache.kafka.common.serialization.StringDeserializer値に使用する必要があります。値としてこれらのクラスを設定KEY_DESERIALIZER_CLASS_CONFIGし、VALUE_DESERIALIZER_CLASS_CONFIGデシリアライズするために消費者を可能にするbyte[]生産者によって送信された符号化されたタイプ。

最後に、の値を設定する必要がありGROUP_ID_CONFIGます。これは、文字列形式のグループ名である必要があります。この構成については、後ほど詳しく説明します。今のところ、4つの必須プロパティが設定されているKafkaコンシューマーを見てください。