View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  
15  package org.eclipse.jetty.util.thread;
16  
17  import java.io.IOException;
18  import java.util.ArrayList;
19  import java.util.Arrays;
20  import java.util.List;
21  import java.util.concurrent.ArrayBlockingQueue;
22  import java.util.concurrent.BlockingQueue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.RejectedExecutionException;
26  import java.util.concurrent.TimeUnit;
27  import java.util.concurrent.atomic.AtomicInteger;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.eclipse.jetty.util.BlockingArrayQueue;
31  import org.eclipse.jetty.util.component.AbstractLifeCycle;
32  import org.eclipse.jetty.util.component.AggregateLifeCycle;
33  import org.eclipse.jetty.util.component.Dumpable;
34  import org.eclipse.jetty.util.component.LifeCycle;
35  import org.eclipse.jetty.util.log.Log;
36  import org.eclipse.jetty.util.log.Logger;
37  import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
38  
39  public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPool, Executor, Dumpable
40  {
41      private static final Logger LOG = Log.getLogger(QueuedThreadPool.class);
42  
43      private final AtomicInteger _threadsStarted = new AtomicInteger();
44      private final AtomicInteger _threadsIdle = new AtomicInteger();
45      private final AtomicLong _lastShrink = new AtomicLong();
46      private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();
47      private final Object _joinLock = new Object();
48      private BlockingQueue<Runnable> _jobs;
49      private String _name;
50      private int _maxIdleTimeMs=60000;
51      private int _maxThreads=254;
52      private int _minThreads=8;
53      private int _maxQueued=-1;
54      private int _priority=Thread.NORM_PRIORITY;
55      private boolean _daemon=false;
56      private int _maxStopTime=100;
57      private boolean _detailedDump=false;
58  
59      /* ------------------------------------------------------------------- */
60      /** Construct
61       */
62      public QueuedThreadPool()
63      {
64          _name="qtp"+super.hashCode();
65      }
66  
67      /* ------------------------------------------------------------------- */
68      /** Construct
69       */
70      public QueuedThreadPool(int maxThreads)
71      {
72          this();
73          setMaxThreads(maxThreads);
74      }
75  
76      /* ------------------------------------------------------------------- */
77      /** Construct
78       */
79      public QueuedThreadPool(BlockingQueue<Runnable> jobQ)
80      {
81          this();
82          _jobs=jobQ;
83          _jobs.clear();
84      }
85  
86  
87      /* ------------------------------------------------------------ */
88      @Override
89      protected void doStart() throws Exception
90      {
91          super.doStart();
92          _threadsStarted.set(0);
93  
94          if (_jobs==null)
95          {
96              _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
97                  :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
98          }
99  
100         int threads=_threadsStarted.get();
101         while (isRunning() && threads<_minThreads)
102         {
103             startThread(threads);
104             threads=_threadsStarted.get();
105         }
106     }
107 
108     /* ------------------------------------------------------------ */
109     @Override
110     protected void doStop() throws Exception
111     {
112         super.doStop();
113         long start=System.currentTimeMillis();
114 
115         // let jobs complete naturally for a while
116         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < (_maxStopTime/2))
117             Thread.sleep(1);
118 
119         // kill queued jobs and flush out idle jobs
120         _jobs.clear();
121         Runnable noop = new Runnable(){public void run(){}};
122         for  (int i=_threadsIdle.get();i-->0;)
123             _jobs.offer(noop);
124         Thread.yield();
125 
126         // interrupt remaining threads
127         if (_threadsStarted.get()>0)
128             for (Thread thread : _threads)
129                 thread.interrupt();
130 
131         // wait for remaining threads to die
132         while (_threadsStarted.get()>0 && (System.currentTimeMillis()-start) < _maxStopTime)
133         {
134             Thread.sleep(1);
135         }
136         Thread.yield();
137         int size=_threads.size();
138         if (size>0)
139         {
140             LOG.warn(size+" threads could not be stopped");
141 
142             if (size==1 || LOG.isDebugEnabled())
143             {
144                 for (Thread unstopped : _threads)
145                 {
146                     LOG.info("Couldn't stop "+unstopped);
147                     for (StackTraceElement element : unstopped.getStackTrace())
148                     {
149                         LOG.info(" at "+element);
150                     }
151                 }
152             }
153         }
154 
155         synchronized (_joinLock)
156         {
157             _joinLock.notifyAll();
158         }
159     }
160 
161     /* ------------------------------------------------------------ */
162     /**
163      * Delegated to the named or anonymous Pool.
164      */
165     public void setDaemon(boolean daemon)
166     {
167         _daemon=daemon;
168     }
169 
170     /* ------------------------------------------------------------ */
171     /** Set the maximum thread idle time.
172      * Threads that are idle for longer than this period may be
173      * stopped.
174      * Delegated to the named or anonymous Pool.
175      * @see #getMaxIdleTimeMs
176      * @param maxIdleTimeMs Max idle time in ms.
177      */
178     public void setMaxIdleTimeMs(int maxIdleTimeMs)
179     {
180         _maxIdleTimeMs=maxIdleTimeMs;
181     }
182 
183     /* ------------------------------------------------------------ */
184     /**
185      * @param stopTimeMs maximum total time that stop() will wait for threads to die.
186      */
187     public void setMaxStopTimeMs(int stopTimeMs)
188     {
189         _maxStopTime = stopTimeMs;
190     }
191 
192     /* ------------------------------------------------------------ */
193     /** Set the maximum number of threads.
194      * Delegated to the named or anonymous Pool.
195      * @see #getMaxThreads
196      * @param maxThreads maximum number of threads.
197      */
198     public void setMaxThreads(int maxThreads)
199     {
200         _maxThreads=maxThreads;
201         if (_minThreads>_maxThreads)
202             _minThreads=_maxThreads;
203     }
204 
205     /* ------------------------------------------------------------ */
206     /** Set the minimum number of threads.
207      * Delegated to the named or anonymous Pool.
208      * @see #getMinThreads
209      * @param minThreads minimum number of threads
210      */
211     public void setMinThreads(int minThreads)
212     {
213         _minThreads=minThreads;
214 
215         if (_minThreads>_maxThreads)
216             _maxThreads=_minThreads;
217 
218         int threads=_threadsStarted.get();
219         while (isStarted() && threads<_minThreads)
220         {
221             startThread(threads);
222             threads=_threadsStarted.get();
223         }
224     }
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * @param name Name of the BoundedThreadPool to use when naming Threads.
229      */
230     public void setName(String name)
231     {
232         if (isRunning())
233             throw new IllegalStateException("started");
234         _name= name;
235     }
236 
237     /* ------------------------------------------------------------ */
238     /** Set the priority of the pool threads.
239      *  @param priority the new thread priority.
240      */
241     public void setThreadsPriority(int priority)
242     {
243         _priority=priority;
244     }
245 
246     /* ------------------------------------------------------------ */
247     /**
248      * @return maximum queue size
249      */
250     public int getMaxQueued()
251     {
252         return _maxQueued;
253     }
254 
255     /* ------------------------------------------------------------ */
256     /**
257      * @param max job queue size
258      */
259     public void setMaxQueued(int max)
260     {
261         if (isRunning())
262             throw new IllegalStateException("started");
263         _maxQueued=max;
264     }
265 
266     /* ------------------------------------------------------------ */
267     /** Get the maximum thread idle time.
268      * Delegated to the named or anonymous Pool.
269      * @see #setMaxIdleTimeMs
270      * @return Max idle time in ms.
271      */
272     public int getMaxIdleTimeMs()
273     {
274         return _maxIdleTimeMs;
275     }
276 
277     /* ------------------------------------------------------------ */
278     /**
279      * @return maximum total time that stop() will wait for threads to die.
280      */
281     public int getMaxStopTimeMs()
282     {
283         return _maxStopTime;
284     }
285 
286     /* ------------------------------------------------------------ */
287     /** Set the maximum number of threads.
288      * Delegated to the named or anonymous Pool.
289      * @see #setMaxThreads
290      * @return maximum number of threads.
291      */
292     public int getMaxThreads()
293     {
294         return _maxThreads;
295     }
296 
297     /* ------------------------------------------------------------ */
298     /** Get the minimum number of threads.
299      * Delegated to the named or anonymous Pool.
300      * @see #setMinThreads
301      * @return minimum number of threads.
302      */
303     public int getMinThreads()
304     {
305         return _minThreads;
306     }
307 
308     /* ------------------------------------------------------------ */
309     /**
310      * @return The name of the BoundedThreadPool.
311      */
312     public String getName()
313     {
314         return _name;
315     }
316 
317     /* ------------------------------------------------------------ */
318     /** Get the priority of the pool threads.
319      *  @return the priority of the pool threads.
320      */
321     public int getThreadsPriority()
322     {
323         return _priority;
324     }
325 
326     /* ------------------------------------------------------------ */
327     /**
328      * Delegated to the named or anonymous Pool.
329      */
330     public boolean isDaemon()
331     {
332         return _daemon;
333     }
334 
335     /* ------------------------------------------------------------ */
336     public boolean isDetailedDump()
337     {
338         return _detailedDump;
339     }
340 
341     /* ------------------------------------------------------------ */
342     public void setDetailedDump(boolean detailedDump)
343     {
344         _detailedDump = detailedDump;
345     }
346 
347     /* ------------------------------------------------------------ */
348     public boolean dispatch(Runnable job)
349     {
350         if (isRunning())
351         {
352             final int jobQ = _jobs.size();
353             final int idle = getIdleThreads();
354             if(_jobs.offer(job))
355             {
356                 // If we had no idle threads or the jobQ is greater than the idle threads
357                 if (idle==0 || jobQ>idle)
358                 {
359                     int threads=_threadsStarted.get();
360                     if (threads<_maxThreads)
361                         startThread(threads);
362                 }
363                 return true;
364             }
365         }
366         return false;
367     }
368 
369     /* ------------------------------------------------------------ */
370     public void execute(Runnable job)
371     {
372         if (!dispatch(job))
373             throw new RejectedExecutionException();
374     }
375 
376     /* ------------------------------------------------------------ */
377     /**
378      * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
379      */
380     public void join() throws InterruptedException
381     {
382         synchronized (_joinLock)
383         {
384             while (isRunning())
385                 _joinLock.wait();
386         }
387 
388         while (isStopping())
389             Thread.sleep(1);
390     }
391 
392     /* ------------------------------------------------------------ */
393     /**
394      * @return The total number of threads currently in the pool
395      */
396     public int getThreads()
397     {
398         return _threadsStarted.get();
399     }
400 
401     /* ------------------------------------------------------------ */
402     /**
403      * @return The number of idle threads in the pool
404      */
405     public int getIdleThreads()
406     {
407         return _threadsIdle.get();
408     }
409 
410     /* ------------------------------------------------------------ */
411     /**
412      * @return True if the pool is at maxThreads and there are not more idle threads than queued jobs
413      */
414     public boolean isLowOnThreads()
415     {
416         return _threadsStarted.get()==_maxThreads && _jobs.size()>=_threadsIdle.get();
417     }
418 
419     /* ------------------------------------------------------------ */
420     private boolean startThread(int threads)
421     {
422         final int next=threads+1;
423         if (!_threadsStarted.compareAndSet(threads,next))
424             return false;
425 
426         boolean started=false;
427         try
428         {
429             Thread thread=newThread(_runnable);
430             thread.setDaemon(_daemon);
431             thread.setPriority(_priority);
432             thread.setName(_name+"-"+thread.getId());
433             _threads.add(thread);
434 
435             thread.start();
436             started=true;
437         }
438         finally
439         {
440             if (!started)
441                 _threadsStarted.decrementAndGet();
442         }
443         return started;
444     }
445 
446     /* ------------------------------------------------------------ */
447     protected Thread newThread(Runnable runnable)
448     {
449         return new Thread(runnable);
450     }
451 
452 
453     /* ------------------------------------------------------------ */
454     public String dump()
455     {
456         return AggregateLifeCycle.dump(this);
457     }
458 
459     /* ------------------------------------------------------------ */
460     public void dump(Appendable out, String indent) throws IOException
461     {
462         List<Object> dump = new ArrayList<Object>(getMaxThreads());
463         for (final Thread thread: _threads)
464         {
465             final StackTraceElement[] trace=thread.getStackTrace();
466             boolean inIdleJobPoll=false;
467             for (StackTraceElement t : trace)
468             {
469                 if ("idleJobPoll".equals(t.getMethodName()))
470                 {
471                     inIdleJobPoll=true;
472                     break;
473                 }
474             }
475             final boolean idle=inIdleJobPoll;
476 
477             if (_detailedDump)
478             {
479                 dump.add(new Dumpable()
480                 {
481                     public void dump(Appendable out, String indent) throws IOException
482                     {
483                         out.append(String.valueOf(thread.getId())).append(' ').append(thread.getName()).append(' ').append(thread.getState().toString()).append(idle?" IDLE":"").append('\n');
484                         if (!idle)
485                             AggregateLifeCycle.dump(out,indent,Arrays.asList(trace));
486                     }
487 
488                     public String dump()
489                     {
490                         return null;
491                     }
492                 });
493             }
494             else
495             {
496                 dump.add(thread.getId()+" "+thread.getName()+" "+thread.getState()+" @ "+(trace.length>0?trace[0]:"???")+(idle?" IDLE":""));
497             }
498         }
499 
500         out.append(String.valueOf(this)).append("\n");
501         AggregateLifeCycle.dump(out,indent,dump);
502 
503     }
504 
505 
506     /* ------------------------------------------------------------ */
507     @Override
508     public String toString()
509     {
510         return _name+"{"+getMinThreads()+"<="+getIdleThreads()+"<="+getThreads()+"/"+getMaxThreads()+","+(_jobs==null?-1:_jobs.size())+"}#"+getState();
511     }
512 
513     /* ------------------------------------------------------------ */
514     private Runnable idleJobPoll() throws InterruptedException
515     {
516         return _jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS);
517     }
518 
519     /* ------------------------------------------------------------ */
520     private Runnable _runnable = new Runnable()
521     {
522         public void run()
523         {
524             boolean shrink=false;
525             try
526             {
527                 Runnable job=_jobs.poll();
528                 while (isRunning())
529                 {
530                     // Job loop
531                     while (job!=null && isRunning())
532                     {
533                         runJob(job);
534                         job=_jobs.poll();
535                     }
536 
537                     // Idle loop
538                     try
539                     {
540                         _threadsIdle.incrementAndGet();
541 
542                         while (isRunning() && job==null)
543                         {
544                             if (_maxIdleTimeMs<=0)
545                                 job=_jobs.take();
546                             else
547                             {
548                                 // maybe we should shrink?
549                                 final int size=_threadsStarted.get();
550                                 if (size>_minThreads)
551                                 {
552                                     long last=_lastShrink.get();
553                                     long now=System.currentTimeMillis();
554                                     if (last==0 || (now-last)>_maxIdleTimeMs)
555                                     {
556                                         shrink=_lastShrink.compareAndSet(last,now) &&
557                                         _threadsStarted.compareAndSet(size,size-1);
558                                         if (shrink)
559                                             return;
560                                     }
561                                 }
562                                 job=idleJobPoll();
563                             }
564                         }
565                     }
566                     finally
567                     {
568                         _threadsIdle.decrementAndGet();
569                     }
570                 }
571             }
572             catch(InterruptedException e)
573             {
574                 LOG.ignore(e);
575             }
576             catch(Exception e)
577             {
578                 LOG.warn(e);
579             }
580             finally
581             {
582                 if (!shrink)
583                     _threadsStarted.decrementAndGet();
584                 _threads.remove(Thread.currentThread());
585             }
586         }
587     };
588 
589     /* ------------------------------------------------------------ */
590     /**
591      * <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
592      * <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
593      *
594      * @param job the job to run
595      */
596     protected void runJob(Runnable job)
597     {
598         job.run();
599     }
600 
601     /* ------------------------------------------------------------ */
602     /**
603      * @return the job queue
604      */
605     protected BlockingQueue<Runnable> getQueue()
606     {
607         return _jobs;
608     }
609 
610     /* ------------------------------------------------------------ */
611     /**
612      * @param id The thread ID to stop.
613      * @return true if the thread was found and stopped.
614      * @deprecated Use {@link #interruptThread(long)} in preference
615      */
616     @Deprecated
617     public boolean stopThread(long id)
618     {
619         for (Thread thread: _threads)
620         {
621             if (thread.getId()==id)
622             {
623                 thread.stop();
624                 return true;
625             }
626         }
627         return false;
628     }
629 
630     /* ------------------------------------------------------------ */
631     /**
632      * @param id The thread ID to interrupt.
633      * @return true if the thread was found and interrupted.
634      */
635     public boolean interruptThread(long id)
636     {
637         for (Thread thread: _threads)
638         {
639             if (thread.getId()==id)
640             {
641                 thread.interrupt();
642                 return true;
643             }
644         }
645         return false;
646     }
647 
648     /* ------------------------------------------------------------ */
649     /**
650      * @param id The thread ID to interrupt.
651      * @return true if the thread was found and interrupted.
652      */
653     public String dumpThread(long id)
654     {
655         for (Thread thread: _threads)
656         {
657             if (thread.getId()==id)
658             {
659                 StringBuilder buf = new StringBuilder();
660                 buf.append(thread.getId()).append(" ").append(thread.getName()).append(" ").append(thread.getState()).append(":\n");
661                 for (StackTraceElement element : thread.getStackTrace())
662                     buf.append("  at ").append(element.toString()).append('\n');
663                 return buf.toString();
664             }
665         }
666         return null;
667     }
668 }