Java 101:苦痛のないJava並行性、パート1

並行アプリケーションがますます複雑になるにつれ、多くの開発者は、Javaの低レベルのスレッド機能がプログラミングのニーズに対して不十分であることに気づきます。その場合は、Java同時実行ユーティリティを見つける時期かもしれません。java.util.concurrentJeff FriesenによるExecutorフレームワーク、シンクロナイザータイプ、およびJava ConcurrentCollectionsパッケージの詳細な紹介から始めましょう。

Java 101:次世代

この新しいJavaWorldシリーズの最初の記事では、Java Date and TimeAPIを紹介しています

Javaプラットフォームは、開発者が異なるスレッドが同時に実行される並行アプリケーションを作成できるようにする低レベルのスレッド機能を提供します。ただし、標準のJavaスレッドにはいくつかの欠点があります。

  • Javaの低レベルの並行処理プリミティブ(synchronizedvolatilewait()notify()、およびnotifyAll())を正しく使用することは容易ではないです。プリミティブの誤った使用に起因するデッドロック、スレッドの枯渇、競合状態などのスレッドの危険性も、検出およびデバッグが困難です。
  • synchronizedスレッド間のアクセスの調整に依存すると、アプリケーションのスケーラビリティに影響を与えるパフォーマンスの問題が発生します。これは、多くの最新のアプリケーションの要件です。
  • Javaの基本的なスレッド機能はあまりにも低レベル。開発者は、セマフォやスレッドプールなど、Javaの低レベルのスレッド機能では提供されない高レベルの構造を必要とすることがよくあります。その結果、開発者は独自の構造を構築することになりますが、これには時間がかかり、エラーが発生しやすくなります。

JSR 166:同時実行ユーティリティフレームワークは、高レベルのスレッド機能のニーズを満たすように設計されました。2002年の初めに開始されたフレームワークは、2年後にJava 5で正式化され、実装されました。Java6、Java 7、および今後のJava8で機能拡張が行われました。

この2部構成のJava101:次世代シリーズでは、基本的なJavaスレッドに精通したソフトウェア開発者にJava ConcurrencyUtilitiesパッケージとフレームワークを紹介します。パート1では、Java Concurrency Utilitiesフレームワークの概要を示し、そのExecutorフレームワーク、シンクロナイザーユーティリティ、およびJava ConcurrentCollectionsパッケージを紹介します。

Javaスレッドを理解する

このシリーズに飛び込む前に、スレッドの基本に精通していることを確認してください。で開始するJava 101 Javaの低レベルのスレッド機能の紹介:

  • パート1:スレッドとランナブルの紹介
  • パート2:スレッドの同期
  • パート3:スレッドのスケジューリング、待機/通知、およびスレッドの中断
  • パート4:スレッドグループ、ボラティリティ、スレッドローカル変数、タイマー、スレッドの停止

Java同時実行ユーティリティの内部

Java Concurrency Utilitiesフレームワークは、並行クラスまたはアプリケーションを作成するためのビルディングブロックとして使用されるように設計されたタイプのライブラリです。これらのタイプはスレッドセーフであり、徹底的にテストされており、高性能を提供します。

Java Concurrency Utilitiesのタイプは、小さなフレームワークに編成されています。つまり、エグゼキュータフレームワーク、シンクロナイザー、同時コレクション、ロック、アトミック変数、およびフォーク/結合です。これらはさらに、メインパッケージとサブパッケージのペアに編成されています。

  • java.util.concurrentには、並行プログラミングで一般的に使用される高レベルのユーティリティタイプが含まれています。例としては、セマフォ、バリア、スレッドプール、同時ハッシュマップなどがあります。
    • java.util.concurrent.atomicサブパッケージは、単一の変数に対するサポートロックフリーでスレッドセーフなプログラミングという低レベルのユーティリティクラスが含まれています。
    • java.util.concurrent.locksのサブパッケージは、ロックとJavaの低レベル同期およびモニターを使用して異なる条件、待ち低レベルのユーティリティタイプを含みます。

Java Concurrency Utilitiesフレームワークは、低レベルのコンペアアンドスワップ(CAS)ハードウェア命令も公開します。そのバリアントは、最新のプロセッサで一般的にサポートされています。CASは、Javaのモニターベースの同期メカニズムよりもはるかに軽量であり、拡張性の高い並行クラスを実装するために使用されます。java.util.concurrent.locks.ReentrantLockたとえば、CASベースのクラスは、同等のモニターベースのsynchronizedプリミティブよりもパフォーマンスが高くなります。ReentrantLockロックをより細かく制御できます。(パート2では、CASがどのように機能するかについて詳しく説明しjava.util.concurrentます。)

System.nanoTime()

Java Concurrency Utilitiesフレームワークにはlong nanoTime()java.lang.Systemクラスのメンバーであるが含まれています。この方法により、相対時間測定を行うためのナノ秒粒度の時間ソースにアクセスできます。

次のセクションでは、Java同時実行ユーティリティの3つの便利な機能を紹介します。最初に、それらが最新の同時実行にとって非常に重要である理由を説明し、次に、並行Javaアプリケーションの速度、信頼性、効率、およびスケーラビリティを向上させるためにそれらがどのように機能するかを示します。

エグゼキュータフレームワーク

スレッド化では、タスク作業の単位です。リスト1に示すように、Javaでの低レベルのスレッド化に関する問題の1つは、タスクの送信がタスク実行ポリシーと緊密に結合されていることです。

リスト1.Server.java(バージョン1)

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; class Server { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(9000); while (true) { final Socket s = socket.accept(); Runnable r = new Runnable() { @Override public void run() { doWork(s); } }; new Thread(r).start(); } } static void doWork(Socket s) { } }

上記のコードは、単純なサーバーアプリケーションを記述しています(doWork(Socket)簡潔にするために空のままにしておきます)。サーバースレッドsocket.accept()は、着信要求を待機するために繰り返し呼び出し、到着時にこの要求を処理するスレッドを開始します。

このアプリケーションはリクエストごとに新しいスレッドを作成するため、膨大な数のリクエストに直面した場合、適切に拡張できません。たとえば、作成された各スレッドにはメモリが必要であり、スレッドが多すぎると使用可能なメモリが使い果たされ、アプリケーションが強制的に終了する可能性があります。

この問題は、タスク実行ポリシーを変更することで解決できます。常に新しいスレッドを作成するのではなく、固定数のスレッドが着信タスクを処理するスレッドプールを使用できます。ただし、この変更を行うには、アプリケーションを書き直す必要があります。

java.util.concurrentエグゼキュータフレームワークが含まれています。これは、タスクの送信をタスク実行ポリシーから切り離すタイプの小さなフレームワークです。Executorフレームワークを使用すると、コードを大幅に書き直すことなく、プログラムのタスク実行ポリシーを簡単に調整できます。

Executorフレームワークの内部

The Executor framework is based on the Executor interface, which describes an executor as any object capable of executing java.lang.Runnable tasks. This interface declares the following solitary method for executing a Runnable task:

void execute(Runnable command)

You submit a Runnable task by passing it to execute(Runnable). If the executor cannot execute the task for any reason (for instance, if the executor has been shut down), this method will throw a RejectedExecutionException.

The key concept is that task submission is decoupled from the task-execution policy, which is described by an Executor implementation. The runnable task is thus able to execute via a new thread, a pooled thread, the calling thread, and so on.

Note that Executor is very limited. For example, you can't shut down an executor or determine whether an asynchronous task has finished. You also can't cancel a running task. For these and other reasons, the Executor framework provides an ExecutorService interface, which extends Executor.

Five of ExecutorService's methods are especially noteworthy:

  • boolean awaitTermination(long timeout, TimeUnit unit) blocks the calling thread until all tasks have completed execution after a shutdown request, the timeout occurs, or the current thread is interrupted, whichever happens first. The maximum time to wait is specified by timeout, and this value is expressed in the unit units specified by the TimeUnit enum; for example, TimeUnit.SECONDS. This method throws java.lang.InterruptedException when the current thread is interrupted. It returns true when the executor is terminated and false when the timeout elapses before termination.
  • boolean isShutdown() returns true when the executor has been shut down.
  • void shutdown() initiates an orderly shutdown in which previously submitted tasks are executed but no new tasks are accepted.
  • Future submit(Callable task) submits a value-returning task for execution and returns a Future representing the pending results of the task.
  • Future submit(Runnable task) submits a Runnable task for execution and returns a Future representing that task.

The Future interface represents the result of an asynchronous computation. The result is known as a future because it typically will not be available until some moment in the future. You can invoke methods to cancel a task, return a task's result (waiting indefinitely or for a timeout to elapse when the task hasn't finished), and determine if a task has been cancelled or has finished.

The Callable interface is similar to the Runnable interface in that it provides a single method describing a task to execute. Unlike Runnable's void run() method, Callable's V call() throws Exception method can return a value and throw an exception.

Executor factory methods

At some point, you'll want to obtain an executor. The Executor framework supplies the Executors utility class for this purpose. Executors offers several factory methods for obtaining different kinds of executors that offer specific thread-execution policies. Here are three examples:

  • ExecutorService newCachedThreadPool() creates a thread pool that creates new threads as needed, but which reuses previously constructed threads when they're available. Threads that haven't been used for 60 seconds are terminated and removed from the cache. This thread pool typically improves the performance of programs that execute many short-lived asynchronous tasks.
  • ExecutorService newSingleThreadExecutor() creates an executor that uses a single worker thread operating off an unbounded queue -- tasks are added to the queue and execute sequentially (no more than one task is active at any one time). If this thread terminates through failure during execution before shutdown of the executor, a new thread will be created to take its place when subsequent tasks need to be executed.
  • ExecutorService newFixedThreadPool(int nThreads) creates a thread pool that re-uses a fixed number of threads operating off a shared unbounded queue. At most nThreads threads are actively processing tasks. If additional tasks are submitted when all threads are active, they wait in the queue until a thread is available. If any thread terminates through failure during execution before shutdown, a new thread will be created to take its place when subsequent tasks need to be executed. The pool's threads exist until the executor is shut down.

The Executor framework offers additional types (such as the ScheduledExecutorService interface), but the types you are likely to work with most often are ExecutorService, Future, Callable, and Executors.

See the java.util.concurrent Javadoc to explore additional types.

Executorフレームワークの操作

Executorフレームワークはかなり簡単に操作できることがわかります。リスト2では、私が使ってきたExecutorし、Executorsよりスケーラブルなスレッドプールベースの代替とリスト1からサーバの例を交換します。

リスト2.Server.java(バージョン2)

import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.Executors; class Server { static Executor pool = Executors.newFixedThreadPool(5); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(9000); while (true) { final Socket s = socket.accept(); Runnable r = new Runnable() { @Override public void run() { doWork(s); } }; pool.execute(r); } } static void doWork(Socket s) { } }

リスト2はnewFixedThreadPool(int)、5つのスレッドを再利用するスレッドプールベースのエグゼキューターを取得するために使用します。また、これらのスレッドのいずれかを介して実行可能なタスクを実行するために置き換えnew Thread(r).start();られpool.execute(r);ます。

リスト3は、アプリケーションが任意のWebページのコンテンツを読み取る別の例を示しています。最大5秒以内にコンテンツが利用できない場合は、結果の行またはエラーメッセージが出力されます。