1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.continuation.Continuation;
25 import org.eclipse.jetty.io.AsyncEndPoint;
26 import org.eclipse.jetty.io.ConnectedEndPoint;
27 import org.eclipse.jetty.io.Connection;
28 import org.eclipse.jetty.io.EndPoint;
29 import org.eclipse.jetty.io.nio.AsyncConnection;
30 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
31 import org.eclipse.jetty.io.nio.SelectorManager;
32 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
33 import org.eclipse.jetty.server.AsyncHttpConnection;
34 import org.eclipse.jetty.server.Request;
35 import org.eclipse.jetty.util.component.AggregateLifeCycle;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.log.Logger;
38 import org.eclipse.jetty.util.thread.ThreadPool;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public class SelectChannelConnector extends AbstractNIOConnector
67 {
68 private static final Logger LOG = Log.getLogger(SelectChannelConnector.class);
69
70 protected ServerSocketChannel _acceptChannel;
71 private int _lowResourcesConnections;
72 private int _lowResourcesMaxIdleTime;
73 private int _localPort=-1;
74
75 private final SelectorManager _manager = new ConnectorSelectorManager();
76
77
78
79
80
81
82 public SelectChannelConnector()
83 {
84 _manager.setMaxIdleTime(getMaxIdleTime());
85 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
86 }
87
88
89 @Override
90 public void accept(int acceptorID) throws IOException
91 {
92 ServerSocketChannel server;
93 synchronized(this)
94 {
95 server = _acceptChannel;
96 }
97
98 if (server!=null && server.isOpen() && _manager.isStarted())
99 {
100 SocketChannel channel = server.accept();
101 channel.configureBlocking(false);
102 Socket socket = channel.socket();
103 configure(socket);
104 _manager.register(channel);
105 }
106 }
107
108
109 public void close() throws IOException
110 {
111 synchronized(this)
112 {
113 if (_acceptChannel != null)
114 _acceptChannel.close();
115 _acceptChannel = null;
116 _localPort=-2;
117 }
118 }
119
120
121 @Override
122 public void customize(EndPoint endpoint, Request request) throws IOException
123 {
124 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
125 aEndp.setCheckForIdle(false);
126 request.setTimeStamp(System.currentTimeMillis());
127 endpoint.setMaxIdleTime(_maxIdleTime);
128 super.customize(endpoint, request);
129 }
130
131
132 @Override
133 public void persist(EndPoint endpoint) throws IOException
134 {
135 AsyncEndPoint aEndp = ((AsyncEndPoint)endpoint);
136 aEndp.setCheckForIdle(true);
137 super.persist(endpoint);
138 }
139
140
141 public SelectorManager getSelectorManager()
142 {
143 return _manager;
144 }
145
146
147 public synchronized Object getConnection()
148 {
149 return _acceptChannel;
150 }
151
152
153 public int getLocalPort()
154 {
155 synchronized(this)
156 {
157 return _localPort;
158 }
159 }
160
161
162 public void open() throws IOException
163 {
164 synchronized(this)
165 {
166 if (_acceptChannel == null)
167 {
168
169 _acceptChannel = ServerSocketChannel.open();
170
171 _acceptChannel.configureBlocking(true);
172
173
174 _acceptChannel.socket().setReuseAddress(getReuseAddress());
175 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
176 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
177
178 _localPort=_acceptChannel.socket().getLocalPort();
179 if (_localPort<=0)
180 throw new IOException("Server channel not bound");
181
182 }
183 }
184 }
185
186
187 @Override
188 public void setMaxIdleTime(int maxIdleTime)
189 {
190 _manager.setMaxIdleTime(maxIdleTime);
191 super.setMaxIdleTime(maxIdleTime);
192 }
193
194
195
196
197
198 public int getLowResourcesConnections()
199 {
200 return _lowResourcesConnections;
201 }
202
203
204
205
206
207
208
209
210 public void setLowResourcesConnections(int lowResourcesConnections)
211 {
212 _lowResourcesConnections=lowResourcesConnections;
213 }
214
215
216
217
218
219 @Override
220 public int getLowResourcesMaxIdleTime()
221 {
222 return _lowResourcesMaxIdleTime;
223 }
224
225
226
227
228
229
230
231
232
233 @Override
234 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
235 {
236 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
237 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
238 }
239
240
241
242
243
244
245 @Override
246 protected void doStart() throws Exception
247 {
248 _manager.setSelectSets(getAcceptors());
249 _manager.setMaxIdleTime(getMaxIdleTime());
250 _manager.setLowResourcesConnections(getLowResourcesConnections());
251 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
252
253 super.doStart();
254 _manager.start();
255 }
256
257
258
259
260
261 @Override
262 protected void doStop() throws Exception
263 {
264 synchronized(this)
265 {
266 if(_manager.isRunning())
267 {
268 try
269 {
270 _manager.stop();
271 }
272 catch (Exception e)
273 {
274 LOG.warn(e);
275 }
276 }
277 }
278 super.doStop();
279 }
280
281
282 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
283 {
284 SelectChannelEndPoint endp= new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
285 endp.setConnection(selectSet.getManager().newConnection(channel,endp, key.attachment()));
286 return endp;
287 }
288
289
290 protected void endPointClosed(SelectChannelEndPoint endpoint)
291 {
292 connectionClosed(endpoint.getConnection());
293 }
294
295
296 protected AsyncConnection newConnection(SocketChannel channel,final AsyncEndPoint endpoint)
297 {
298 return new AsyncHttpConnection(SelectChannelConnector.this,endpoint,getServer());
299 }
300
301
302 public void dump(Appendable out, String indent) throws IOException
303 {
304 super.dump(out, indent);
305 ServerSocketChannel channel;
306 synchronized (this)
307 {
308 channel=_acceptChannel;
309 }
310 if (channel==null)
311 AggregateLifeCycle.dump(out,indent,Arrays.asList(null,"CLOSED",_manager));
312 else
313 AggregateLifeCycle.dump(out,indent,Arrays.asList(channel,channel.isOpen()?"OPEN":"CLOSED",_manager));
314 }
315
316
317
318
319 private final class ConnectorSelectorManager extends SelectorManager
320 {
321 @Override
322 public boolean dispatch(Runnable task)
323 {
324 ThreadPool pool=getThreadPool();
325 if (pool==null)
326 pool=getServer().getThreadPool();
327 return pool.dispatch(task);
328 }
329
330 @Override
331 protected void endPointClosed(final SelectChannelEndPoint endpoint)
332 {
333 SelectChannelConnector.this.endPointClosed(endpoint);
334 }
335
336 @Override
337 protected void endPointOpened(SelectChannelEndPoint endpoint)
338 {
339
340 connectionOpened(endpoint.getConnection());
341 }
342
343 @Override
344 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
345 {
346 connectionUpgraded(oldConnection,endpoint.getConnection());
347 }
348
349 @Override
350 public AsyncConnection newConnection(SocketChannel channel,AsyncEndPoint endpoint, Object attachment)
351 {
352 return SelectChannelConnector.this.newConnection(channel,endpoint);
353 }
354
355 @Override
356 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
357 {
358 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
359 }
360 }
361
362 }