Java-ハンギングスレッドの検出と処理

アレックスによる。C.プネン

建築家–ノキアシーメンスネットワークス

バンガロール

ハングスレッドは、SNMP、Q3、Telnetなどの独自仕様または標準化されたインターフェイスを使用して独自仕様のデバイスとインターフェイスする必要があるソフトウェアの開発における一般的な課題です。この問題は、ネットワーク管理だけでなく、Webサーバー、リモートプロシージャコールを呼び出すプロセスなど、さまざまな分野で発生します。

デバイスへの要求を開始するスレッドには、デバイスが応答しないか、部分的にしか応答しない場合に検出するメカニズムが必要です。このようなハングが検出された場合は、特定のアクションを実行する必要があります。具体的なアクションは、再試行するか、タスクの失敗またはその他の回復オプションについてエンドユーザーに通知することです。コンポーネントによって多数のタスクを多数のネットワーク要素に対して実行する必要がある場合は、他のタスク処理のボトルネックにならないように、ハングスレッドの検出が重要です。したがって、ハングしているスレッドの管理には、パフォーマンス通知という2つの側面があります。

通知側面我々は、マルチスレッドの世界にフィットするためのJava Observerパターンを調整することができます。

マルチスレッドシステムに合わせたJavaオブザーバーパターンの調整

タスクがハングするためThreadPool、適切な戦略でJavaクラスを使用することが最初に頭に浮かぶ解決策です。ただしThreadPool、一定期間にわたってランダムにハングする一部のスレッドのコンテキストでJavaを使用すると、固定スレッドプール戦略の場合のスレッド不足など、使用される特定の戦略に基づいて望ましくない動作が発生します。これは主に、JavaにThreadPoolスレッドのハングを検出するメカニズムがないためです。

キャッシュされたスレッドプールを試すこともできますが、問題もあります。タスクの起動率が高く、一部のスレッドがハングした場合、スレッドの数が急増し、最終的にリソースの不足とメモリ不足の例外が発生する可能性があります。または、をThreadPool呼び出すカスタム戦略を使用することもできますCallerRunsPolicy。この場合も、スレッドがハングすると、最終的にすべてのスレッドがハングする可能性があります。(メインスレッドに渡されたタスクがハングし、すべてが停止する可能性があるため、メインスレッドが呼び出し元になることはありません。)

それで、解決策は何ですか?タスクレートとハングしているスレッドの数に基づいてプールサイズを調整する、それほど単純ではないThreadPoolパターンを示します。まず、ぶら下がっているスレッドを検出する問題に行きましょう。

ぶら下がっているスレッドの検出

図1は、パターンの抽象化を示しています。

ここには2つの重要なクラスがあります:ThreadManagerManagedThread。どちらもJavaThreadクラスから拡張されています。は、をThreadManager保持するコンテナを保持しManagedThreadsます。新しいものManagedThreadが作成されると、それ自体がこのコンテナに追加されます。

 ThreadHangTester testthread = new ThreadHangTester("threadhangertest",2000,false); testthread.start(); thrdManger.manage(testthread, ThreadManager.RESTART_THREAD, 10); thrdManger.start(); 

ThreadManagerこのリストを反復処理し、呼び出すManagedThreadisHung()方法を。これは基本的にタイムスタンプチェックロジックです。

 if(System.currentTimeMillis() - lastprocessingtime.get() > maxprocessingtime ) { logger.debug("Thread is hung"); return true; } 

スレッドがタスクループに入って結果を更新していないことがわかった場合は、で規定されている回復メカニズムを使用しManageThreadます。

 while(isRunning) { for (Iterator iterator = managedThreads.iterator(); iterator.hasNext();) { ManagedThreadData thrddata = (ManagedThreadData) iterator.next(); if(thrddata.getManagedThread().isHung()) { logger.warn("Thread Hang detected for ThreadName=" + thrddata.getManagedThread().getName() ); switch (thrddata.getManagedAction()) { case RESTART_THREAD: // The action here is to restart the the thread //remove from the manager iterator.remove(); //stop the processing of this thread if possible thrddata.getManagedThread().stopProcessing(); if(thrddata.getManagedThread().getClass() == ThreadHangTester.class) //To know which type of thread to create { ThreadHangTester newThread =new ThreadHangTester("restarted_ThrdHangTest",5000,true); //Create a new thread newThread.start(); //add it back to be managed manage(newThread, thrddata.getManagedAction(), thrddata.getThreadChecktime()); } break; ......... 

ManagedThreadハングしたものの代わりに新しいものを作成して使用するには、状態やコンテナを保持しないでください。このために、ManagedThread行為が分離されるべきであるコンテナ。ここでは、ENUMベースのシングルトンパターンを使用してタスクリストを保持しています。したがって、タスクを保持するコンテナは、タスクを処理するスレッドから独立しています。説明されているパターンのソースをダウンロードするには、次のリンクをクリックしてください:Javaスレッドマネージャーソース。

ハングスレッドとJavaThreadPool戦略

JavaにThreadPoolは、ハングしているスレッドを検出するメカニズムがありません。固定Executors.newFixedThreadPool()スレッドプール()のような戦略を使用しても機能しません。これは、一部のタスクが時間の経過とともにハングすると、すべてのスレッドが最終的にハング状態になるためです。別のオプションは、キャッシュされたThreadPoolポリシーを使用することです(Executors.newCachedThreadPool())。これにより、VMメモリ、CPU、およびスレッドの制限によってのみ制約され、タスクを処理するために使用可能なスレッドが常に存在することが保証されます。ただし、このポリシーでは、作成されるスレッドの数を制御することはできません。処理スレッドがハングするかどうかに関係なく、タスクレートが高いときにこのポリシーを使用すると、膨大な数のスレッドが作成されます。 JVMに十分なリソースがすぐにない場合は、最大メモリしきい値または高いCPUに到達します。スレッドの数が数百または数千に達するのはよくあることです。タスクが処理されると解放されますが、バースト処理中に多数のスレッドがシステムリソースを圧倒する場合があります。

3番目のオプションは、カスタム戦略またはポリシーを使用することです。そのようなオプションの1つは、0から最大数までスケーリングするスレッドプールを用意することです。したがって、1つのスレッドがハングした場合でも、最大スレッド数に達した限り、新しいスレッドが作成されます。

 execexec = new ThreadPoolExecutor(0, 3, 60, TimeUnit.SECONDS, new SynchronousQueue()); 

ここで、3は最大スレッド数であり、これはタスク集約型のプロセスであるため、キープアライブ時間は60秒に設定されています。十分に高い最大スレッド数を指定する場合、これは、タスクをハングさせるコンテキストで使用するのに多かれ少なかれ合理的なポリシーです。唯一の問題は、ハングしているスレッドが最終的に解放されない場合、すべてのスレッドがある時点でハングする可能性がわずかにあることです。最大スレッド数が十分に高く、タスクのハングがまれな現象であると想定している場合、このポリシーは適切です。

ThreadPoolぶら下がっている糸を検出するプラグ可能なメカニズムもあれば、それは素晴らしいことでした。そのような設計の1つについては後で説明します。もちろん、すべてのスレッドがフリーズした場合は、スレッドプールの拒否されたタスクポリシーを構成して使用できます。タスクを破棄したくない場合は、CallerRunsPolicy:を使用する必要があります。

 execexec = new ThreadPoolExecutor(0, 20, 20, TimeUnit.MILLISECONDS, new SynchronousQueue() new ThreadPoolExecutor.CallerRunsPolicy()); 

この場合、スレッドのハングによってタスクが拒否された場合、そのタスクは処理対象の呼び出し元のスレッドに渡されます。そのタスクがぶら下がる可能性は常にあります。この場合、プロセス全体がフリーズします。したがって、このコンテキストではそのようなポリシーを追加しない方がよいでしょう。

 public class NotificationProcessor implements Runnable { private final NotificationOriginator notificationOrginator; boolean isRunning = true; private final ExecutorService execexec; AlarmNotificationProcessor(NotificationOriginator norginator) { //ctor // execexec = Executors.newCachedThreadPool();// Too many threads // execexec = Executors.newFixedThreadPool(2);//, no hang tasks detection execexec = new ThreadPoolExecutor(0, 4, 250, TimeUnit.MILLISECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy()); } public void run() { while (isRunning) { try { final Task task = TaskQueue.INSTANCE.getTask(); Runnable thisTrap= new Runnable() { public void run() { ++alarmid; notificaionOrginator.notify(new OctetString(), // Task processing nbialarmnew.getOID(), nbialarmnew.createVariableBindingPayload()); É........}} ; execexec.execute(thisTrap); } 

ハング検出機能を備えたカスタムスレッドプール

タスクハングの検出と処理の機能を備えたスレッドプールライブラリがあれば素晴らしいでしょう。私はそれを開発しました、そしてそれを以下に示します。これは実際には、私が設計してしばらく前に使用したC ++スレッドプールからのポートです(リファレンスを参照)。基本的に、このソリューションはCommandパターンとChain ofResponsibilityパターンを使用します。ただし、関数オブジェクトのサポートを使用せずにJavaでコマンドパターンを実装するのは少し難しいです。このため、Javaリフレクションを使用するために実装を少し変更する必要がありました。このパターンが設計されたコンテキストは、既存のクラスを変更せずにスレッドプールをフィッティング/プラグインする必要がある場合であることに注意してください。(オブジェクト指向プログラミングの大きな利点の1つは、オープンクローズド原則を効果的に利用するようにクラスを設計する方法を提供することです。これは特に複雑な古いレガシーコードに当てはまり、関連性が低い可能性があります。新製品の開発。)したがって、コマンドパターンを実装するためにインターフェイスを使用する代わりにリフレクションを使用しました。残りのコードは、ほとんどすべてのスレッド同期およびシグナリングプリミティブがJava 1.5以降で使用可能であるため、大きな変更なしに移植できます。残りのコードは、ほとんどすべてのスレッド同期およびシグナリングプリミティブがJava 1.5以降で使用可能であるため、大きな変更なしに移植できます。残りのコードは、ほとんどすべてのスレッド同期およびシグナリングプリミティブがJava 1.5以降で使用可能であるため、大きな変更なしに移植できます。

 public class Command { private Object[ ]argParameter; ........ //Ctor for a method with two args Command(T pObj, String methodName, long timeout, String key, int arg1, int arg2) { m_objptr = pObj; m_methodName = mthodName; m_timeout = timeout; m_key = key; argParameter = new Object[2]; argParameter[0] = arg1; argParameter[1] = arg2; } // Calls the method of the object void execute() { Class klass = m_objptr.getClass(); Class[] paramTypes = new Class[]{int.class, int.class}; try { Method methodName = klass.getMethod(m_methodName, paramTypes); //System.out.println("Found the method--> " + methodName); if (argParameter.length == 2) { methodName.invoke(m_objptr, (Object) argParameter[0], (Object) argParameter[1]); } 

このパターンの使用例:

 public class CTask {.. public int DoSomething(int a, int b) {...} } 

Command cmd4 = new Command(task4, "DoMultiplication", 1, "key2",2,5);

ここにさらに2つの重要なクラスがあります。1つは、ThreadChainChain ofResponsibilityパターンを実装するクラスです。

 public class ThreadChain implements Runnable { public ThreadChain(ThreadChain p, ThreadPool pool, String name) { AddRef(); deleteMe = false; busy = false; //--> very important next = p; //set the thread chain - note this is like a linked list impl threadpool = pool; //set the thread pool - Root of the threadpool ........ threadId = ++ThreadId; ...... // start the thread thisThread = new Thread(this, name + inttid.toString()); thisThread.start(); } 

このクラスには2つの主要なメソッドがあります。1つはブール値CanHandle()で、ThreadPoolクラスによって開始され、再帰的に進行します。これにより、現在のスレッド(現在のThreadChainインスタンス)がタスクを自由に処理できるかどうかがチェックされます。すでにタスクを処理している場合は、チェーン内の次のタスクを呼び出します。

 public Boolean canHandle() { if (!busy) { //If not busy System.out.println("Can Handle This Event in id=" + threadId); // todo signal an event try { condLock.lock(); condWait.signal(); //Signal the HandleRequest which is waiting for this in the run method ......................................... return true; } ......................................... ///Else see if the next object in the chain is free /// to handle the request return next.canHandle(); 

HandleRequestはメソッドThreadChainから呼び出され、Thread run()メソッドからのシグナルを待つメソッドであることに注意してくださいcanHandle。また、コマンドパターンを介してタスクがどのように処理されるかに注意してください。