View Javadoc

1   // ========================================================================
2   // Copyright (c) 2004-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses.
12  // ========================================================================
13  
14  package org.eclipse.jetty.io.nio;
15  
16  import java.io.IOException;
17  import java.nio.channels.ClosedChannelException;
18  import java.nio.channels.SelectableChannel;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.SocketChannel;
21  
22  import org.eclipse.jetty.io.AsyncEndPoint;
23  import org.eclipse.jetty.io.Buffer;
24  import org.eclipse.jetty.io.ConnectedEndPoint;
25  import org.eclipse.jetty.io.Connection;
26  import org.eclipse.jetty.io.EofException;
27  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28  import org.eclipse.jetty.util.log.Log;
29  import org.eclipse.jetty.util.log.Logger;
30  import org.eclipse.jetty.util.thread.Timeout.Task;
31  
32  /* ------------------------------------------------------------ */
33  /**
34   * An Endpoint that can be scheduled by {@link SelectorManager}.
35   */
36  public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37  {
38      public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39  
40      private final SelectorManager.SelectSet _selectSet;
41      private final SelectorManager _manager;
42      private  SelectionKey _key;
43      private final Runnable _handler = new Runnable()
44          {
45              public void run() { handle(); }
46          };
47  
48      /** The desired value for {@link SelectionKey#interestOps()} */
49      private int _interestOps;
50  
51      /**
52       * The connection instance is the handler for any IO activity on the endpoint.
53       * There is a different type of connection for HTTP, AJP, WebSocket and
54       * ProxyConnect.   The connection may change for an SCEP as it is upgraded
55       * from HTTP to proxy connect or websocket.
56       */
57      private volatile AsyncConnection _connection;
58  
59      /** true if a thread has been dispatched to handle this endpoint */
60      private boolean _dispatched = false;
61  
62      /** true if a non IO dispatch (eg async resume) is outstanding */
63      private boolean _asyncDispatch = false;
64  
65      /** true if the last write operation succeed and wrote all offered bytes */
66      private volatile boolean _writable = true;
67  
68  
69      /** True if a thread has is blocked in {@link #blockReadable(long)} */
70      private boolean _readBlocked;
71  
72      /** True if a thread has is blocked in {@link #blockWritable(long)} */
73      private boolean _writeBlocked;
74  
75      /** true if {@link SelectSet#destroyEndPoint(SelectChannelEndPoint)} has not been called */
76      private boolean _open;
77  
78      private volatile long _idleTimestamp;
79  
80      private boolean _ishut;
81  
82      /* ------------------------------------------------------------ */
83      public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
84          throws IOException
85      {
86          super(channel, maxIdleTime);
87  
88          _manager = selectSet.getManager();
89          _selectSet = selectSet;
90          _dispatched = false;
91          _asyncDispatch = false;
92          _open=true;
93          _key = key;
94  
95          setCheckForIdle(true);
96      }
97  
98      /* ------------------------------------------------------------ */
99      public SelectionKey getSelectionKey()
100     {
101         synchronized (this)
102         {
103             return _key;
104         }
105     }
106 
107     /* ------------------------------------------------------------ */
108     public SelectorManager getSelectManager()
109     {
110         return _manager;
111     }
112 
113     /* ------------------------------------------------------------ */
114     public Connection getConnection()
115     {
116         return _connection;
117     }
118 
119     /* ------------------------------------------------------------ */
120     public void setConnection(Connection connection)
121     {
122         Connection old=_connection;
123         _connection=(AsyncConnection)connection;
124         if (old!=null && old!=_connection)
125             _manager.endPointUpgraded(this,old);
126     }
127 
128     /* ------------------------------------------------------------ */
129     public long getIdleTimestamp()
130     {
131         return _idleTimestamp;
132     }
133 
134     /* ------------------------------------------------------------ */
135     /** Called by selectSet to schedule handling
136      *
137      */
138     public void schedule()
139     {
140         synchronized (this)
141         {
142             // If there is no key, then do nothing
143             if (_key == null || !_key.isValid())
144             {
145                 _readBlocked=false;
146                 _writeBlocked=false;
147                 this.notifyAll();
148                 return;
149             }
150 
151             // If there are threads dispatched reading and writing
152             if (_readBlocked || _writeBlocked)
153             {
154                 // assert _dispatched;
155                 if (_readBlocked && _key.isReadable())
156                     _readBlocked=false;
157                 if (_writeBlocked && _key.isWritable())
158                     _writeBlocked=false;
159 
160                 // wake them up is as good as a dispatched.
161                 this.notifyAll();
162 
163                 // we are not interested in further selecting
164                 _key.interestOps(0);
165                 if (!_dispatched)
166                     updateKey();
167                 return;
168             }
169 
170             // Remove writeable op
171             if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
172             {
173                 // Remove writeable op
174                 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
175                 _key.interestOps(_interestOps);
176                 _writable = true; // Once writable is in ops, only removed with dispatch.
177             }
178 
179             // Dispatch if we are not already
180             if (!_dispatched)
181             {
182                 dispatch();
183                 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
184                 {
185                     _key.interestOps(0);
186                 }
187             }
188         }
189     }
190 
191     /* ------------------------------------------------------------ */
192     public void asyncDispatch()
193     {
194         synchronized(this)
195         {
196             if (_dispatched)
197                 _asyncDispatch=true;
198             else
199                 dispatch();
200         }
201     }
202 
203     /* ------------------------------------------------------------ */
204     public void dispatch()
205     {
206         synchronized(this)
207         {
208             if (_dispatched)
209             {
210                 throw new IllegalStateException("dispatched");
211             }
212             else
213             {
214                 _dispatched = true;
215                 boolean dispatched = _manager.dispatch(_handler);
216                 if(!dispatched)
217                 {
218                     _dispatched = false;
219                     LOG.warn("Dispatched Failed! "+this+" to "+_manager);
220                     updateKey();
221                 }
222             }
223         }
224     }
225 
226     /* ------------------------------------------------------------ */
227     /**
228      * Called when a dispatched thread is no longer handling the endpoint.
229      * The selection key operations are updated.
230      * @return If false is returned, the endpoint has been redispatched and
231      * thread must keep handling the endpoint.
232      */
233     protected boolean undispatch()
234     {
235         synchronized (this)
236         {
237             if (_asyncDispatch)
238             {
239                 _asyncDispatch=false;
240                 return false;
241             }
242             _dispatched = false;
243             updateKey();
244         }
245         return true;
246     }
247 
248     /* ------------------------------------------------------------ */
249     public void cancelTimeout(Task task)
250     {
251         getSelectSet().cancelTimeout(task);
252     }
253 
254     /* ------------------------------------------------------------ */
255     public void scheduleTimeout(Task task, long timeoutMs)
256     {
257         getSelectSet().scheduleTimeout(task,timeoutMs);
258     }
259 
260     /* ------------------------------------------------------------ */
261     public void setCheckForIdle(boolean check)
262     {
263         _idleTimestamp=check?System.currentTimeMillis():0;
264     }
265     
266     /* ------------------------------------------------------------ */
267     public boolean isCheckForIdle()
268     {
269         return _idleTimestamp!=0;
270     }
271     
272     /* ------------------------------------------------------------ */
273     protected void notIdle()
274     {
275         if (_idleTimestamp!=0)
276             _idleTimestamp=System.currentTimeMillis();
277     }
278     
279     /* ------------------------------------------------------------ */
280     public void checkIdleTimestamp(long now)
281     {
282         long idleTimestamp=_idleTimestamp;
283         if (!getChannel().isOpen() || idleTimestamp!=0 && _maxIdleTime>0 && now>(idleTimestamp+_maxIdleTime))
284         {
285             onIdleExpired();
286             _idleTimestamp=now;
287         }
288     }
289 
290     /* ------------------------------------------------------------ */
291     public void onIdleExpired()
292     {
293         _connection.onIdleExpired();
294     }
295 
296     /* ------------------------------------------------------------ */
297     @Override
298     public int fill(Buffer buffer) throws IOException
299     {
300         int fill=super.fill(buffer);
301         if (fill>0)
302             notIdle();
303         return fill;
304     }
305 
306     /* ------------------------------------------------------------ */
307     @Override
308     public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
309     {
310         int l = super.flush(header, buffer, trailer);
311 
312         // If there was something to write and it wasn't written, then we are not writable.
313         if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
314         {
315             synchronized (this)
316             {
317                 _writable=false;
318                 if (!_dispatched)
319                     updateKey();
320             }
321         }
322         else if (l>0)
323         {
324             _writable=true;
325             notIdle();
326         }
327         return l;
328     }
329 
330     /* ------------------------------------------------------------ */
331     /*
332      */
333     @Override
334     public int flush(Buffer buffer) throws IOException
335     {
336         int l = super.flush(buffer);
337 
338         // If there was something to write and it wasn't written, then we are not writable.
339         if (l==0 && buffer!=null && buffer.hasContent())
340         {
341             synchronized (this)
342             {
343                 _writable=false;
344                 if (!_dispatched)
345                     updateKey();
346             }
347         }
348         else if (l>0)
349         {
350             _writable=true;
351             notIdle();
352         }
353 
354         return l;
355     }
356 
357     /* ------------------------------------------------------------ */
358     /*
359      * Allows thread to block waiting for further events.
360      */
361     @Override
362     public boolean blockReadable(long timeoutMs) throws IOException
363     {
364         synchronized (this)
365         {
366             if (isInputShutdown())
367                 throw new EofException();
368 
369             long now=_selectSet.getNow();
370             long end=now+timeoutMs;
371             boolean check=isCheckForIdle();
372             setCheckForIdle(true);
373             try
374             {
375                 _readBlocked=true;
376                 while (!isInputShutdown() && _readBlocked)
377                 {
378                     try
379                     {
380                         updateKey();
381                         this.wait(timeoutMs>=0?(end-now):10000);
382                     }
383                     catch (InterruptedException e)
384                     {
385                         LOG.warn(e);
386                     }
387                     finally
388                     {
389                         now=_selectSet.getNow();
390                     }
391 
392                     if (_readBlocked && timeoutMs>0 && now>=end)
393                         return false;
394                 }
395             }
396             finally
397             {
398                 _readBlocked=false;
399                 setCheckForIdle(check);
400             }
401         }
402         return true;
403     }
404 
405     /* ------------------------------------------------------------ */
406     /*
407      * Allows thread to block waiting for further events.
408      */
409     @Override
410     public boolean blockWritable(long timeoutMs) throws IOException
411     {
412         synchronized (this)
413         {
414             if (isOutputShutdown())
415                 throw new EofException();
416 
417             long now=_selectSet.getNow();
418             long end=now+timeoutMs;
419             boolean check=isCheckForIdle();
420             setCheckForIdle(true);
421             try
422             {
423                 _writeBlocked=true;
424                 while (_writeBlocked && !isOutputShutdown())
425                 {
426                     try
427                     {
428                         updateKey();
429                         this.wait(timeoutMs>=0?(end-now):10000);
430                     }
431                     catch (InterruptedException e)
432                     {
433                         LOG.warn(e);
434                     }
435                     finally
436                     {
437                         now=_selectSet.getNow();
438                     }
439                     if (_writeBlocked && timeoutMs>0 && now>=end)
440                         return false;
441                 }
442             }
443             finally
444             {
445                 _writeBlocked=false;
446                 setCheckForIdle(check);
447             }
448         }
449         return true;
450     }
451 
452     /* ------------------------------------------------------------ */
453     /* short cut for busyselectChannelServerTest */
454     public void clearWritable()
455     {
456         _writable=false;
457     }
458 
459     /* ------------------------------------------------------------ */
460     /**
461      * @see org.eclipse.jetty.io.AsyncEndPoint#scheduleWrite()
462      */
463     public void scheduleWrite()
464     {
465         if (_writable==true)
466             LOG.debug("Required scheduleWrite {}",this);
467 
468         _writable=false;
469         updateKey();
470     }
471 
472     /* ------------------------------------------------------------ */
473     public boolean isWritable()
474     {
475         return _writable;
476     }
477 
478     /* ------------------------------------------------------------ */
479     public boolean hasProgressed()
480     {
481         return false;
482     }
483 
484     /* ------------------------------------------------------------ */
485     /**
486      * Updates selection key. Adds operations types to the selection key as needed. No operations
487      * are removed as this is only done during dispatch. This method records the new key and
488      * schedules a call to doUpdateKey to do the keyChange
489      */
490     private void updateKey()
491     {
492         final boolean changed;
493         synchronized (this)
494         {
495             int current_ops=-1;
496             if (getChannel().isOpen())
497             {
498                 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
499                 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
500 
501                 _interestOps =
502                     ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ  : 0)
503                 |   ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
504                 try
505                 {
506                     current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
507                 }
508                 catch(Exception e)
509                 {
510                     _key=null;
511                     LOG.ignore(e);
512                 }
513             }
514             changed=_interestOps!=current_ops;
515         }
516 
517         if(changed)
518         {
519             _selectSet.addChange(this);
520             _selectSet.wakeup();
521         }
522     }
523 
524 
525     /* ------------------------------------------------------------ */
526     /**
527      * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
528      */
529     void doUpdateKey()
530     {
531         synchronized (this)
532         {
533             if (getChannel().isOpen())
534             {
535                 if (_interestOps>0)
536                 {
537                     if (_key==null || !_key.isValid())
538                     {
539                         SelectableChannel sc = (SelectableChannel)getChannel();
540                         if (sc.isRegistered())
541                         {
542                             updateKey();
543                         }
544                         else
545                         {
546                             try
547                             {
548                                 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
549                             }
550                             catch (Exception e)
551                             {
552                                 LOG.ignore(e);
553                                 if (_key!=null && _key.isValid())
554                                 {
555                                     _key.cancel();
556                                 }
557 
558                                 if (_open)
559                                 {
560                                     _selectSet.destroyEndPoint(this);
561                                 }
562                                 _open=false;
563                                 _key = null;
564                             }
565                         }
566                     }
567                     else
568                     {
569                         _key.interestOps(_interestOps);
570                     }
571                 }
572                 else
573                 {
574                     if (_key!=null && _key.isValid())
575                         _key.interestOps(0);
576                     else
577                         _key=null;
578                 }
579             }
580             else
581             {
582                 if (_key!=null && _key.isValid())
583                     _key.cancel();
584 
585                 if (_open)
586                 {
587                     _open=false;
588                     _selectSet.destroyEndPoint(this);
589                 }
590                 _key = null;
591             }
592         }
593     }
594 
595     /* ------------------------------------------------------------ */
596     /*
597      */
598     protected void handle()
599     {
600         boolean dispatched=true;
601         try
602         {
603             while(dispatched)
604             {
605                 try
606                 {
607                     while(true)
608                     {
609                         final AsyncConnection next = (AsyncConnection)_connection.handle();
610                         if (next!=_connection)
611                         {
612                             LOG.debug("{} replaced {}",next,_connection);
613                             Connection old=_connection;
614                             _connection=next;
615                             _manager.endPointUpgraded(this,old);
616                             continue;
617                         }
618                         break;
619                     }
620                 }
621                 catch (ClosedChannelException e)
622                 {
623                     LOG.ignore(e);
624                 }
625                 catch (EofException e)
626                 {
627                     LOG.debug("EOF", e);
628                     try{close();}
629                     catch(IOException e2){LOG.ignore(e2);}
630                 }
631                 catch (IOException e)
632                 {
633                     LOG.warn(e.toString());
634                     LOG.debug(e);
635                     try{close();}
636                     catch(IOException e2){LOG.ignore(e2);}
637                 }
638                 catch (Throwable e)
639                 {
640                     LOG.warn("handle failed", e);
641                     try{close();}
642                     catch(IOException e2){LOG.ignore(e2);}
643                 }
644                 finally
645                 {
646                     if (!_ishut && isInputShutdown() && isOpen())
647                     {
648                         _ishut=true;
649                         try
650                         {
651                             _connection.onInputShutdown();
652                         }
653                         catch(Throwable x)
654                         {
655                             LOG.warn("onInputShutdown failed", x);
656                             try{close();}
657                             catch(IOException e2){LOG.ignore(e2);}
658                         }
659                         finally
660                         {
661                             updateKey();
662                         }
663                     }
664                     dispatched=!undispatch();
665                 }
666             }
667         }
668         finally
669         {
670             if (dispatched)
671             {
672                 dispatched=!undispatch();
673                 while (dispatched)
674                 {
675                     LOG.warn("SCEP.run() finally DISPATCHED");
676                     dispatched=!undispatch();
677                 }
678             }
679         }
680     }
681 
682     /* ------------------------------------------------------------ */
683     /*
684      * @see org.eclipse.io.nio.ChannelEndPoint#close()
685      */
686     @Override
687     public void close() throws IOException
688     {
689         try
690         {
691             super.close();
692         }
693         catch (IOException e)
694         {
695             LOG.ignore(e);
696         }
697         finally
698         {
699             updateKey();
700         }
701     }
702 
703     /* ------------------------------------------------------------ */
704     @Override
705     public String toString()
706     {
707         synchronized(this)
708         {
709             return String.format("SCEP@%x{%s->%s,d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s%s%s}",
710                     hashCode(),
711                     _socket.getRemoteSocketAddress(),
712                     _socket.getLocalSocketAddress(),
713                     _dispatched,
714                     isOpen(),
715                     isInputShutdown(),
716                     isOutputShutdown(),
717                     _readBlocked,
718                     _writeBlocked,
719                     _writable,
720                     _interestOps,
721                     _key != null && _key.isValid() ? "" : "!",
722                     _key != null && _key.isValid() && _key.isReadable() ? "r" : "",
723                     _key != null && _key.isValid() && _key.isWritable() ? "w" : "");
724         }
725     }
726 
727     /* ------------------------------------------------------------ */
728     public SelectSet getSelectSet()
729     {
730         return _selectSet;
731     }
732 
733     /* ------------------------------------------------------------ */
734     /**
735      * Don't set the SoTimeout
736      * @see org.eclipse.jetty.io.nio.ChannelEndPoint#setMaxIdleTime(int)
737      */
738     @Override
739     public void setMaxIdleTime(int timeMs) throws IOException
740     {
741         _maxIdleTime=timeMs;
742     }
743 
744 }