1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22 import java.util.Arrays;
23
24 import org.eclipse.jetty.continuation.Continuation;
25 import org.eclipse.jetty.io.ConnectedEndPoint;
26 import org.eclipse.jetty.io.Connection;
27 import org.eclipse.jetty.io.EndPoint;
28 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
29 import org.eclipse.jetty.io.nio.SelectorManager;
30 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
31 import org.eclipse.jetty.server.Connector;
32 import org.eclipse.jetty.server.HttpConnection;
33 import org.eclipse.jetty.server.Request;
34 import org.eclipse.jetty.server.Server;
35 import org.eclipse.jetty.util.component.AggregateLifeCycle;
36 import org.eclipse.jetty.util.log.Log;
37 import org.eclipse.jetty.util.thread.Timeout.Task;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public class SelectChannelConnector extends AbstractNIOConnector
66 {
67 protected ServerSocketChannel _acceptChannel;
68 private int _lowResourcesConnections;
69 private int _lowResourcesMaxIdleTime;
70 private int _localPort=-1;
71
72 private final SelectorManager _manager = new ConnectorSelectorManager();
73
74
75
76
77
78
79 public SelectChannelConnector()
80 {
81 _manager.setMaxIdleTime(getMaxIdleTime());
82 setAcceptors(Math.max(1,(Runtime.getRuntime().availableProcessors()+3)/4));
83 }
84
85
86 @Override
87 public void accept(int acceptorID) throws IOException
88 {
89 ServerSocketChannel server = _acceptChannel;
90 if (server!=null && server.isOpen())
91 {
92 SocketChannel channel = _acceptChannel.accept();
93 channel.configureBlocking(false);
94 Socket socket = channel.socket();
95 configure(socket);
96 _manager.register(channel);
97 }
98 }
99
100
101 public void close() throws IOException
102 {
103 synchronized(this)
104 {
105 if(_manager.isRunning())
106 {
107 try
108 {
109 _manager.stop();
110 }
111 catch (Exception e)
112 {
113 Log.warn(e);
114 }
115 }
116 if (_acceptChannel != null)
117 _acceptChannel.close();
118 _acceptChannel = null;
119 _localPort=-2;
120 }
121 }
122
123
124 @Override
125 public void customize(EndPoint endpoint, Request request) throws IOException
126 {
127 SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
128 cep.cancelIdle();
129 request.setTimeStamp(cep.getSelectSet().getNow());
130 endpoint.setMaxIdleTime(_maxIdleTime);
131 super.customize(endpoint, request);
132 }
133
134
135 @Override
136 public void persist(EndPoint endpoint) throws IOException
137 {
138 ((SelectChannelEndPoint)endpoint).scheduleIdle();
139 super.persist(endpoint);
140 }
141
142
143 public Object getConnection()
144 {
145 return _acceptChannel;
146 }
147
148
149 public int getLocalPort()
150 {
151 synchronized(this)
152 {
153 return _localPort;
154 }
155 }
156
157
158 public void open() throws IOException
159 {
160 synchronized(this)
161 {
162 if (_acceptChannel == null)
163 {
164
165 _acceptChannel = ServerSocketChannel.open();
166
167 _acceptChannel.configureBlocking(true);
168
169
170 _acceptChannel.socket().setReuseAddress(getReuseAddress());
171 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
172 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
173
174 _localPort=_acceptChannel.socket().getLocalPort();
175 if (_localPort<=0)
176 throw new IOException("Server channel not bound");
177
178 }
179 }
180 }
181
182
183 @Override
184 public void setMaxIdleTime(int maxIdleTime)
185 {
186 _manager.setMaxIdleTime(maxIdleTime);
187 super.setMaxIdleTime(maxIdleTime);
188 }
189
190
191
192
193
194 public int getLowResourcesConnections()
195 {
196 return _lowResourcesConnections;
197 }
198
199
200
201
202
203
204
205
206 public void setLowResourcesConnections(int lowResourcesConnections)
207 {
208 _lowResourcesConnections=lowResourcesConnections;
209 }
210
211
212
213
214
215 @Override
216 public int getLowResourcesMaxIdleTime()
217 {
218 return _lowResourcesMaxIdleTime;
219 }
220
221
222
223
224
225
226
227
228
229 @Override
230 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
231 {
232 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
233 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
234 }
235
236
237
238
239
240
241 @Override
242 protected void doStart() throws Exception
243 {
244 _manager.setSelectSets(getAcceptors());
245 _manager.setMaxIdleTime(getMaxIdleTime());
246 _manager.setLowResourcesConnections(getLowResourcesConnections());
247 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
248 _manager.start();
249
250 super.doStart();
251
252
253 for (int i=0;i<getAcceptors();i++)
254 {
255 final int id=i;
256 _manager.dispatch(new Runnable()
257 {
258 public void run()
259 {
260 String name=Thread.currentThread().getName();
261 try
262 {
263 Thread.currentThread().setName(name+" Selector"+id+" "+SelectChannelConnector.this);
264 while (isRunning())
265 {
266 try
267 {
268 _manager.doSelect(id);
269 }
270 catch(ThreadDeath e)
271 {
272 throw e;
273 }
274 catch(IOException e)
275 {
276 Log.ignore(e);
277 }
278 catch(Exception e)
279 {
280 Log.warn(e);
281 }
282 }
283 }
284 finally
285 {
286 Thread.currentThread().setName(name);
287 }
288 }
289 });
290 }
291 }
292
293
294
295
296
297 @Override
298 protected void doStop() throws Exception
299 {
300 super.doStop();
301 }
302
303
304 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
305 {
306 return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
307 }
308
309
310 protected void endPointClosed(SelectChannelEndPoint endpoint)
311 {
312 connectionClosed(endpoint.getConnection());
313 }
314
315
316 protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
317 {
318 return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint);
319 }
320
321
322 public void dump(Appendable out, String indent) throws IOException
323 {
324 out.append(String.valueOf(this)).append("\n");
325 ServerSocketChannel channel=_acceptChannel;
326 if (channel==null)
327 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{null,"CLOSED",_manager}));
328 else
329 AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager}));
330 }
331
332
333
334
335 private class SelectChannelHttpConnection extends HttpConnection
336 {
337 private final SelectChannelEndPoint _endpoint;
338
339 private SelectChannelHttpConnection(Connector connector, EndPoint endpoint, Server server, SelectChannelEndPoint endpoint2)
340 {
341 super(connector,endpoint,server);
342 _endpoint = endpoint2;
343 }
344
345
346 @Override
347 public void cancelTimeout(Task task)
348 {
349 _endpoint.getSelectSet().cancelTimeout(task);
350 }
351
352
353 @Override
354 public void scheduleTimeout(Task task, long timeoutMs)
355 {
356 _endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
357 }
358 }
359
360
361
362
363 private final class ConnectorSelectorManager extends SelectorManager
364 {
365 @Override
366 public boolean dispatch(Runnable task)
367 {
368 return getThreadPool().dispatch(task);
369 }
370
371 @Override
372 protected void endPointClosed(final SelectChannelEndPoint endpoint)
373 {
374 SelectChannelConnector.this.endPointClosed(endpoint);
375 }
376
377 @Override
378 protected void endPointOpened(SelectChannelEndPoint endpoint)
379 {
380
381 connectionOpened(endpoint.getConnection());
382 }
383
384 @Override
385 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
386 {
387 connectionUpgraded(oldConnection,endpoint.getConnection());
388 }
389
390 @Override
391 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
392 {
393 return SelectChannelConnector.this.newConnection(channel,endpoint);
394 }
395
396 @Override
397 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
398 {
399 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
400 }
401 }
402
403 }