View Javadoc

1   // ========================================================================
2   // Copyright (c) 2003-2009 Mort Bay Consulting Pty. Ltd.
3   // ------------------------------------------------------------------------
4   // All rights reserved. This program and the accompanying materials
5   // are made available under the terms of the Eclipse Public License v1.0
6   // and Apache License v2.0 which accompanies this distribution.
7   // The Eclipse Public License is available at 
8   // http://www.eclipse.org/legal/epl-v10.html
9   // The Apache License v2.0 is available at
10  // http://www.opensource.org/licenses/apache2.0.php
11  // You may elect to redistribute this code under either of these licenses. 
12  // ========================================================================
13  
14  package org.eclipse.jetty.server.nio;
15  
16  import java.io.IOException;
17  import java.net.InetSocketAddress;
18  import java.net.Socket;
19  import java.nio.channels.SelectionKey;
20  import java.nio.channels.ServerSocketChannel;
21  import java.nio.channels.SocketChannel;
22  
23  import org.eclipse.jetty.io.ConnectedEndPoint;
24  import org.eclipse.jetty.io.Connection;
25  import org.eclipse.jetty.io.EndPoint;
26  import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
27  import org.eclipse.jetty.io.nio.SelectorManager;
28  import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
29  import org.eclipse.jetty.server.HttpConnection;
30  import org.eclipse.jetty.server.Request;
31  import org.eclipse.jetty.util.log.Log;
32  import org.eclipse.jetty.util.thread.Timeout.Task;
33  
34  /* ------------------------------------------------------------------------------- */
35  /**
36   * Selecting NIO connector.
37   * <p>
38   * This connector uses efficient NIO buffers with a non blocking threading model. Direct NIO buffers
39   * are used and threads are only allocated to connections with requests. Synchronization is used to
40   * simulate blocking for the servlet API, and any unflushed content at the end of request handling
41   * is written asynchronously.
42   * </p>
43   * <p>
44   * This connector is best used when there are a many connections that have idle periods.
45   * </p>
46   * <p>
47   * When used with {@link org.eclipse.jetty.continuation.Continuation}, threadless waits are supported. When
48   * a filter or servlet calls getEvent on a Continuation, a {@link org.eclipse.jetty.server.RetryRequest}
49   * runtime exception is thrown to allow the thread to exit the current request handling. Jetty will
50   * catch this exception and will not send a response to the client. Instead the thread is released
51   * and the Continuation is placed on the timer queue. If the Continuation timeout expires, or it's
52   * resume method is called, then the request is again allocated a thread and the request is retried.
53   * The limitation of this approach is that request content is not available on the retried request,
54   * thus if possible it should be read after the continuation or saved as a request attribute or as the
55   * associated object of the Continuation instance.
56   * </p>
57   * 
58   * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
59   * 
60   * 
61   *
62   */
63  public class SelectChannelConnector extends AbstractNIOConnector 
64  {
65      protected ServerSocketChannel _acceptChannel;
66      private int _lowResourcesConnections;
67      private int _lowResourcesMaxIdleTime;
68  
69      private final SelectorManager _manager = new SelectorManager()
70      {
71          @Override
72          protected SocketChannel acceptChannel(SelectionKey key) throws IOException
73          {
74              // TODO handle max connections
75              SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
76              if (channel==null)
77                  return null;
78              channel.configureBlocking(false);
79              Socket socket = channel.socket();
80              configure(socket);
81              return channel;
82          }
83  
84          @Override
85          public boolean dispatch(Runnable task)
86          {
87              return getThreadPool().dispatch(task);
88          }
89  
90          @Override
91          protected void endPointClosed(final SelectChannelEndPoint endpoint)
92          {
93              connectionClosed(endpoint.getConnection());
94          }
95  
96          @Override
97          protected void endPointOpened(SelectChannelEndPoint endpoint)
98          {
99              // TODO handle max connections and low resources
100             connectionOpened(endpoint.getConnection());
101         }
102         
103         @Override
104         protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
105         {
106             connectionUpgraded(oldConnection,endpoint.getConnection());
107         }
108 
109         @Override
110         protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
111         {
112             return SelectChannelConnector.this.newConnection(channel,endpoint);
113         }
114 
115         @Override
116         protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
117         {
118             return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
119         }
120     };
121     
122     /* ------------------------------------------------------------------------------- */
123     /**
124      * Constructor.
125      * 
126      */
127     public SelectChannelConnector()
128     {
129     }
130     
131     /* ------------------------------------------------------------ */
132     @Override
133     public void accept(int acceptorID) throws IOException
134     {
135         _manager.doSelect(acceptorID);
136     }
137     
138     /* ------------------------------------------------------------ */
139     public void close() throws IOException
140     {
141         synchronized(this)
142         {
143             if(_manager.isRunning())
144             {
145                 try
146                 {
147                     _manager.stop();
148                 }
149                 catch (Exception e)
150                 {
151                     Log.warn(e);
152                 }
153             }
154             if (_acceptChannel != null)
155                 _acceptChannel.close();
156             _acceptChannel = null;
157         }
158     }
159     
160     /* ------------------------------------------------------------------------------- */
161     @Override
162     public void customize(EndPoint endpoint, Request request) throws IOException
163     {
164         SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
165         cep.cancelIdle();
166         request.setTimeStamp(cep.getSelectSet().getNow());
167         super.customize(endpoint, request);
168     }
169     
170     /* ------------------------------------------------------------------------------- */
171     @Override
172     public void persist(EndPoint endpoint) throws IOException
173     {
174         ((SelectChannelEndPoint)endpoint).scheduleIdle();
175         super.persist(endpoint);
176     }
177 
178     /* ------------------------------------------------------------ */
179     public Object getConnection()
180     {
181         return _acceptChannel;
182     }
183 
184     /* ------------------------------------------------------------------------------- */
185     public int getLocalPort()
186     {
187         synchronized(this)
188         {
189             if (_acceptChannel==null || !_acceptChannel.isOpen())
190                 return -1;
191             return _acceptChannel.socket().getLocalPort();
192         }
193     }
194 
195     /* ------------------------------------------------------------ */
196     public void open() throws IOException
197     {
198         synchronized(this)
199         {
200             if (_acceptChannel == null)
201             {
202                 // Create a new server socket
203                 _acceptChannel = ServerSocketChannel.open();
204                 // Set to blocking mode
205                 _acceptChannel.configureBlocking(true);
206 
207                 // Bind the server socket to the local host and port
208                 _acceptChannel.socket().setReuseAddress(getReuseAddress());
209                 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
210                 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
211 
212                 if (_acceptChannel.socket().getLocalPort()==-1)
213                     throw new IOException("Server channel not bound");
214                 
215                 // Set to non blocking mode
216                 _acceptChannel.configureBlocking(false);
217                 
218             }
219         }
220     }
221 
222     /* ------------------------------------------------------------ */
223     @Override
224     public void setMaxIdleTime(int maxIdleTime)
225     {
226         _manager.setMaxIdleTime(maxIdleTime);
227         super.setMaxIdleTime(maxIdleTime);
228     }
229 
230     /* ------------------------------------------------------------ */
231     /**
232      * @return the lowResourcesConnections
233      */
234     public int getLowResourcesConnections()
235     {
236         return _lowResourcesConnections;
237     }
238 
239     /* ------------------------------------------------------------ */
240     /**
241      * Set the number of connections, which if exceeded places this manager in low resources state.
242      * This is not an exact measure as the connection count is averaged over the select sets.
243      * @param lowResourcesConnections the number of connections
244      * @see {@link #setLowResourcesMaxIdleTime(int)}
245      */
246     public void setLowResourcesConnections(int lowResourcesConnections)
247     {
248         _lowResourcesConnections=lowResourcesConnections;
249     }
250 
251     /* ------------------------------------------------------------ */
252     /**
253      * @return the lowResourcesMaxIdleTime
254      */
255     @Override
256     public int getLowResourcesMaxIdleTime()
257     {
258         return _lowResourcesMaxIdleTime;
259     }
260 
261     /* ------------------------------------------------------------ */
262     /**
263      * Set the period in ms that a connection is allowed to be idle when this there are more
264      * than {@link #getLowResourcesConnections()} connections.  This allows the server to rapidly close idle connections
265      * in order to gracefully handle high load situations.
266      * @param lowResourcesMaxIdleTime the period in ms that a connection is allowed to be idle when resources are low.
267      * @see {@link #setMaxIdleTime(long)}
268      */
269     @Override
270     public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
271     {
272         _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
273         super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime); 
274     }
275 
276     
277     /* ------------------------------------------------------------ */
278     /*
279      * @see org.eclipse.jetty.server.server.AbstractConnector#doStart()
280      */
281     @Override
282     protected void doStart() throws Exception
283     {
284         _manager.setSelectSets(getAcceptors());
285         _manager.setMaxIdleTime(getMaxIdleTime());
286         _manager.setLowResourcesConnections(getLowResourcesConnections());
287         _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
288         _manager.start();
289         open();
290         _manager.register(_acceptChannel);
291         super.doStart();
292     }
293 
294     /* ------------------------------------------------------------ */
295     /*
296      * @see org.eclipse.jetty.server.server.AbstractConnector#doStop()
297      */
298     @Override
299     protected void doStop() throws Exception
300     {        
301         super.doStop();
302     }
303 
304     /* ------------------------------------------------------------ */
305     protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
306     {
307         return new SelectChannelEndPoint(channel,selectSet,key);
308     }
309 
310     /* ------------------------------------------------------------------------------- */
311     protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
312     {
313         return new HttpConnection(SelectChannelConnector.this,endpoint,getServer())
314         {
315             /* ------------------------------------------------------------ */
316             @Override
317             public void cancelTimeout(Task task)
318             {
319                 endpoint.getSelectSet().cancelTimeout(task);
320             }
321 
322             /* ------------------------------------------------------------ */
323             @Override
324             public void scheduleTimeout(Task task, long timeoutMs)
325             {
326                 endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
327             }
328         };
329     }
330 
331     /* ------------------------------------------------------------------------------- */
332     public void dump()
333     {
334         Log.info("channel "+_acceptChannel+(_acceptChannel.isOpen()?" is open":" is closed"));
335         _manager.dump();
336     }
337 }