View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.thread.Timeout;
30  
31  /* ------------------------------------------------------------ */
32  /**
33   * An Endpoint that can be scheduled by {@link SelectorManager}.
34   */
35  public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint
36  {
37      private final SelectorManager.SelectSet _selectSet;
38      private final SelectorManager _manager;
39      private volatile Connection _connection;
40      private boolean _dispatched = false;
41      private boolean _redispatched = false;
42      private volatile boolean _writable = true; 
43      
44      private  SelectionKey _key;
45      private int _interestOps;
46      private boolean _readBlocked;
47      private boolean _writeBlocked;
48      private boolean _open;
49      private final Timeout.Task _idleTask = new IdleTask();
50  
51      /* ------------------------------------------------------------ */
52      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
53      {
54          super(channel);
55  
56          _manager = selectSet.getManager();
57          _selectSet = selectSet;
58          _dispatched = false;
59          _redispatched = false;
60          _open=true;       
61          _key = key;
62  
63          _connection = _manager.newConnection(channel,this);
64          _manager.endPointOpened(this); 
65          
66          scheduleIdle();
67      }
68  
69      /* ------------------------------------------------------------ */
70      public SelectionKey getSelectionKey()
71      {
72          synchronized (this)
73          {
74              return _key;
75          }
76      }
77      
78      /* ------------------------------------------------------------ */
79      public SelectorManager getSelectManager()
80      {
81          return _manager;
82      }
83      
84      /* ------------------------------------------------------------ */
85      public Connection getConnection()
86      {
87          return _connection;
88      }
89      
90      /* ------------------------------------------------------------ */
91      public void setConnection(Connection connection)
92      {
93          Connection old=_connection;
94          _connection=connection;
95          _manager.endPointUpgraded(this,old);
96      }
97  
98      /* ------------------------------------------------------------ */
99      /** Called by selectSet to schedule handling
100      * 
101      */
102     public void schedule() 
103     {
104         synchronized (this)
105         {
106             // If there is no key, then do nothing
107             if (_key == null || !_key.isValid())
108             {
109                 _readBlocked=false;
110                 _writeBlocked=false;
111                 this.notifyAll();
112                 return;
113             }
114             
115             // If there are threads dispatched reading and writing
116             if (_readBlocked || _writeBlocked)
117             {
118                 // assert _dispatched;
119                 if (_readBlocked && _key.isReadable())
120                     _readBlocked=false;
121                 if (_writeBlocked && _key.isWritable())
122                     _writeBlocked=false;
123 
124                 // wake them up is as good as a dispatched.
125                 this.notifyAll();
126                 
127                 // we are not interested in further selecting
128                 _key.interestOps(0);
129                 return;
130             }
131 
132             // Otherwise if we are still dispatched
133             if (!isReadyForDispatch())
134             {
135                 // we are not interested in further selecting
136                 _key.interestOps(0);
137                 return;
138             }
139 
140 
141             // Remove writeable op
142             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
143             {
144                 // Remove writeable op
145                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
146                 _key.interestOps(_interestOps);
147                 _writable = true; // Once writable is in ops, only removed with dispatch.
148             }
149 
150             if (_dispatched)
151                 _key.interestOps(0);
152             else
153                 dispatch();
154         }
155     }
156     
157     /* ------------------------------------------------------------ */
158     public void dispatch() 
159     {
160         synchronized(this)
161         {
162             if (_dispatched)
163                 _redispatched=true;
164             else
165             {
166                 _dispatched = _manager.dispatch(this);
167                 if(!_dispatched)
168                 {
169                     Log.warn("Dispatched Failed!");
170                     updateKey();
171                 }
172             }
173         }
174     }
175 
176     /* ------------------------------------------------------------ */
177     /**
178      * Called when a dispatched thread is no longer handling the endpoint. 
179      * The selection key operations are updated.
180      * @return If false is returned, the endpoint has been redispatched and 
181      * thread must keep handling the endpoint.
182      */
183     private boolean undispatch()
184     {
185         synchronized (this)
186         {
187             if (_redispatched)
188             {
189                 _redispatched=false;
190                 return false;
191             }
192             _dispatched = false;
193             updateKey();
194         }
195         return true;
196     }
197 
198     /* ------------------------------------------------------------ */
199     public void scheduleIdle()
200     {
201         _selectSet.scheduleIdle(_idleTask);
202     }
203 
204     /* ------------------------------------------------------------ */
205     public void cancelIdle()
206     {
207         _selectSet.cancelIdle(_idleTask);
208     }
209 
210     /* ------------------------------------------------------------ */
211     protected void idleExpired()
212     {
213         try
214         {
215             close();
216         }
217         catch (IOException e)
218         {
219             Log.ignore(e);
220         }
221     }
222 
223     /* ------------------------------------------------------------ */
224     /*
225      */
226     @Override
227     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
228     {
229         int l = super.flush(header, buffer, trailer);
230         if (!(_writable=l!=0))
231         {
232             synchronized (this)
233             {
234                 if (!_dispatched)
235                     updateKey();
236             }
237         }
238         return l;
239     }
240 
241     /* ------------------------------------------------------------ */
242     /*
243      */
244     @Override
245     public int flush(Buffer buffer) throws IOException
246     {
247         int l = super.flush(buffer);
248         if (!(_writable=l!=0))
249         {
250             synchronized (this)
251             {
252                 if (!_dispatched)
253                     updateKey();
254             }
255         }
256         return l;
257     }
258 
259     /* ------------------------------------------------------------ */
260     public boolean isReadyForDispatch()
261     {
262         synchronized (this)
263         {
264             return !(_dispatched || getConnection().isSuspended());
265         }
266     }
267     
268     /* ------------------------------------------------------------ */
269     /*
270      * Allows thread to block waiting for further events.
271      */
272     @Override
273     public boolean blockReadable(long timeoutMs) throws IOException
274     {
275         synchronized (this)
276         {
277             long start=_selectSet.getNow();
278             try
279             {   
280                 _readBlocked=true;
281                 while (isOpen() && _readBlocked)
282                 {
283                     try
284                     {
285                         updateKey();
286                         this.wait(timeoutMs);
287 
288                         timeoutMs -= _selectSet.getNow()-start;
289                         if (_readBlocked && timeoutMs<=0)
290                             return false;
291                     }
292                     catch (InterruptedException e)
293                     {
294                         Log.warn(e);
295                     }
296                 }
297             }
298             finally
299             {
300                 _readBlocked=false;
301             }
302         }
303         return true;
304     }
305 
306     /* ------------------------------------------------------------ */
307     /*
308      * Allows thread to block waiting for further events.
309      */
310     @Override
311     public boolean blockWritable(long timeoutMs) throws IOException
312     {
313         synchronized (this)
314         {
315             long start=_selectSet.getNow();
316             try
317             {   
318                 _writeBlocked=true;
319                 while (isOpen() && _writeBlocked)
320                 {
321                     try
322                     {
323                         updateKey();
324                         this.wait(timeoutMs);
325 
326                         timeoutMs -= _selectSet.getNow()-start;
327                         if (_writeBlocked && timeoutMs<=0)
328                             return false;
329                     }
330                     catch (InterruptedException e)
331                     {
332                         Log.warn(e);
333                     }
334                 }
335             }
336             finally
337             {
338                 _writeBlocked=false;
339             }
340         }
341         return true;
342     }
343 
344     /* ------------------------------------------------------------ */
345     public void setWritable(boolean writable)
346     {
347         _writable=writable;
348     }
349     
350     /* ------------------------------------------------------------ */
351     public void scheduleWrite()
352     {
353         _writable=false;
354         updateKey();
355     }
356     
357     /* ------------------------------------------------------------ */
358     /**
359      * Updates selection key. Adds operations types to the selection key as needed. No operations
360      * are removed as this is only done during dispatch. This method records the new key and
361      * schedules a call to doUpdateKey to do the keyChange
362      */
363     private void updateKey()
364     {
365         synchronized (this)
366         {
367             int ops=-1;
368             if (getChannel().isOpen())
369             {
370                 _interestOps = 
371                     ((!_dispatched || _readBlocked)  ? SelectionKey.OP_READ  : 0) 
372                 |   ((!_writable   || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
373                 try
374                 {
375                     ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
376                 }
377                 catch(Exception e)
378                 {
379                     _key=null;
380                     Log.ignore(e);
381                 }
382             }
383 
384             if(_interestOps == ops && getChannel().isOpen())
385                 return;
386             
387         }
388         _selectSet.addChange(this);
389         _selectSet.wakeup();
390     }
391     
392     /* ------------------------------------------------------------ */
393     /**
394      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
395      */
396     void doUpdateKey()
397     {
398         synchronized (this)
399         {
400             if (getChannel().isOpen())
401             {
402                 if (_interestOps>0)
403                 {
404                     if (_key==null || !_key.isValid())
405                     {
406                         SelectableChannel sc = (SelectableChannel)getChannel();
407                         if (sc.isRegistered())
408                         {
409                             updateKey();   
410                         }
411                         else
412                         {
413                             try
414                             {
415                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
416                             }
417                             catch (Exception e)
418                             {
419                                 Log.ignore(e);
420                                 if (_key!=null && _key.isValid())
421                                 {
422                                     _key.cancel();
423                                 }
424                                 cancelIdle();
425 
426                                 if (_open)
427                                     _manager.endPointClosed(this);
428                                 _open=false;
429                                 _key = null;
430                             }
431                         }
432                     }
433                     else
434                     {
435                         _key.interestOps(_interestOps);
436                     }
437                 }
438                 else
439                 {
440                     if (_key.isValid())
441                         _key.interestOps(0);
442                     else
443                         _key=null;
444                 }
445             }
446             else    
447             {
448                 if (_key!=null && _key.isValid())
449                     _key.cancel(); 
450                 
451                 cancelIdle();
452                 if (_open)
453                     _manager.endPointClosed(this);
454                 _open=false;
455                 _key = null;
456             }
457         }
458     }
459 
460     /* ------------------------------------------------------------ */
461     /* 
462      */
463     public void run()
464     {
465         boolean dispatched=true;
466         try
467         {
468             while(dispatched)
469             {
470                 try
471                 {
472                     while(true)
473                     {
474                         final Connection next = _connection.handle();
475                         if (next!=_connection)
476                         {  
477                             _connection=next;
478                             continue;
479                         }
480                         break;
481                     }
482                 }
483                 catch (ClosedChannelException e)
484                 {
485                     Log.ignore(e);
486                 }
487                 catch (EofException e)
488                 {
489                     Log.debug("EOF", e);
490                     try{close();}
491                     catch(IOException e2){Log.ignore(e2);}
492                 }
493                 catch (IOException e)
494                 {
495                     Log.warn(e.toString());
496                     Log.debug(e);
497                     try{close();}
498                     catch(IOException e2){Log.ignore(e2);}
499                 }
500                 catch (Throwable e)
501                 {
502                     Log.warn("handle failed", e);
503                     try{close();}
504                     catch(IOException e2){Log.ignore(e2);}
505                 }
506                 dispatched=!undispatch();
507             }
508         }
509         finally
510         {
511             if (dispatched)
512             {
513                 dispatched=!undispatch();
514                 while (dispatched)
515                 {
516                     Log.warn("SCEP.run() finally DISPATCHED");
517                     dispatched=!undispatch();
518                 }
519             }
520         }
521     }
522 
523     /* ------------------------------------------------------------ */
524     /*
525      * @see org.eclipse.io.nio.ChannelEndPoint#close()
526      */
527     @Override
528     public void close() throws IOException
529     {
530         try
531         {
532             super.close();
533         }
534         catch (IOException e)
535         {
536             Log.ignore(e);
537         }   
538         finally
539         {
540             updateKey();
541         }
542     }
543     
544     /* ------------------------------------------------------------ */
545     @Override
546     public String toString()
547     {
548         synchronized(this)
549         {
550             return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
551             ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
552         }
553     }
554 
555     /* ------------------------------------------------------------ */
556     public Timeout.Task getTimeoutTask()
557     {
558         return _idleTask;
559     }
560 
561     /* ------------------------------------------------------------ */
562     public SelectSet getSelectSet()
563     {
564         return _selectSet;
565     }
566 
567     /* ------------------------------------------------------------ */
568     /* ------------------------------------------------------------ */
569     /* ------------------------------------------------------------ */
570     private class IdleTask extends Timeout.Task
571     {
572         /* ------------------------------------------------------------ */
573         /*
574          * @see org.eclipse.thread.Timeout.Task#expire()
575          */
576         @Override
577         public void expired()
578         {
579             idleExpired();
580         }
581 
582         @Override
583         public String toString()
584         {
585             return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
586         }
587     }
588 }