1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.server.nio;
15
16 import java.io.IOException;
17 import java.net.InetSocketAddress;
18 import java.net.Socket;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.ServerSocketChannel;
21 import java.nio.channels.SocketChannel;
22
23 import org.eclipse.jetty.io.ConnectedEndPoint;
24 import org.eclipse.jetty.io.Connection;
25 import org.eclipse.jetty.io.EndPoint;
26 import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
27 import org.eclipse.jetty.io.nio.SelectorManager;
28 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
29 import org.eclipse.jetty.server.HttpConnection;
30 import org.eclipse.jetty.server.Request;
31 import org.eclipse.jetty.util.log.Log;
32 import org.eclipse.jetty.util.thread.Timeout.Task;
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 public class SelectChannelConnector extends AbstractNIOConnector
64 {
65 protected ServerSocketChannel _acceptChannel;
66 private int _lowResourcesConnections;
67 private int _lowResourcesMaxIdleTime;
68
69 private final SelectorManager _manager = new SelectorManager()
70 {
71 @Override
72 protected SocketChannel acceptChannel(SelectionKey key) throws IOException
73 {
74
75 SocketChannel channel = ((ServerSocketChannel)key.channel()).accept();
76 if (channel==null)
77 return null;
78 channel.configureBlocking(false);
79 Socket socket = channel.socket();
80 configure(socket);
81 return channel;
82 }
83
84 @Override
85 public boolean dispatch(Runnable task)
86 {
87 return getThreadPool().dispatch(task);
88 }
89
90 @Override
91 protected void endPointClosed(final SelectChannelEndPoint endpoint)
92 {
93 connectionClosed(endpoint.getConnection());
94 }
95
96 @Override
97 protected void endPointOpened(SelectChannelEndPoint endpoint)
98 {
99
100 connectionOpened(endpoint.getConnection());
101 }
102
103 @Override
104 protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
105 {
106 connectionUpgraded(oldConnection,endpoint.getConnection());
107 }
108
109 @Override
110 protected Connection newConnection(SocketChannel channel,SelectChannelEndPoint endpoint)
111 {
112 return SelectChannelConnector.this.newConnection(channel,endpoint);
113 }
114
115 @Override
116 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey sKey) throws IOException
117 {
118 return SelectChannelConnector.this.newEndPoint(channel,selectSet,sKey);
119 }
120 };
121
122
123
124
125
126
127 public SelectChannelConnector()
128 {
129 }
130
131
132 @Override
133 public void accept(int acceptorID) throws IOException
134 {
135 _manager.doSelect(acceptorID);
136 }
137
138
139 public void close() throws IOException
140 {
141 synchronized(this)
142 {
143 if(_manager.isRunning())
144 {
145 try
146 {
147 _manager.stop();
148 }
149 catch (Exception e)
150 {
151 Log.warn(e);
152 }
153 }
154 if (_acceptChannel != null)
155 _acceptChannel.close();
156 _acceptChannel = null;
157 }
158 }
159
160
161 @Override
162 public void customize(EndPoint endpoint, Request request) throws IOException
163 {
164 SelectChannelEndPoint cep = ((SelectChannelEndPoint)endpoint);
165 cep.cancelIdle();
166 request.setTimeStamp(cep.getSelectSet().getNow());
167 super.customize(endpoint, request);
168 }
169
170
171 @Override
172 public void persist(EndPoint endpoint) throws IOException
173 {
174 ((SelectChannelEndPoint)endpoint).scheduleIdle();
175 super.persist(endpoint);
176 }
177
178
179 public Object getConnection()
180 {
181 return _acceptChannel;
182 }
183
184
185 public int getLocalPort()
186 {
187 synchronized(this)
188 {
189 if (_acceptChannel==null || !_acceptChannel.isOpen())
190 return -1;
191 return _acceptChannel.socket().getLocalPort();
192 }
193 }
194
195
196 public void open() throws IOException
197 {
198 synchronized(this)
199 {
200 if (_acceptChannel == null)
201 {
202
203 _acceptChannel = ServerSocketChannel.open();
204
205 _acceptChannel.configureBlocking(true);
206
207
208 _acceptChannel.socket().setReuseAddress(getReuseAddress());
209 InetSocketAddress addr = getHost()==null?new InetSocketAddress(getPort()):new InetSocketAddress(getHost(),getPort());
210 _acceptChannel.socket().bind(addr,getAcceptQueueSize());
211
212 if (_acceptChannel.socket().getLocalPort()==-1)
213 throw new IOException("Server channel not bound");
214
215
216 _acceptChannel.configureBlocking(false);
217
218 }
219 }
220 }
221
222
223 @Override
224 public void setMaxIdleTime(int maxIdleTime)
225 {
226 _manager.setMaxIdleTime(maxIdleTime);
227 super.setMaxIdleTime(maxIdleTime);
228 }
229
230
231
232
233
234 public int getLowResourcesConnections()
235 {
236 return _lowResourcesConnections;
237 }
238
239
240
241
242
243
244
245
246 public void setLowResourcesConnections(int lowResourcesConnections)
247 {
248 _lowResourcesConnections=lowResourcesConnections;
249 }
250
251
252
253
254
255 @Override
256 public int getLowResourcesMaxIdleTime()
257 {
258 return _lowResourcesMaxIdleTime;
259 }
260
261
262
263
264
265
266
267
268
269 @Override
270 public void setLowResourcesMaxIdleTime(int lowResourcesMaxIdleTime)
271 {
272 _lowResourcesMaxIdleTime=lowResourcesMaxIdleTime;
273 super.setLowResourcesMaxIdleTime(lowResourcesMaxIdleTime);
274 }
275
276
277
278
279
280
281 @Override
282 protected void doStart() throws Exception
283 {
284 _manager.setSelectSets(getAcceptors());
285 _manager.setMaxIdleTime(getMaxIdleTime());
286 _manager.setLowResourcesConnections(getLowResourcesConnections());
287 _manager.setLowResourcesMaxIdleTime(getLowResourcesMaxIdleTime());
288 _manager.start();
289 open();
290 _manager.register(_acceptChannel);
291 super.doStart();
292 }
293
294
295
296
297
298 @Override
299 protected void doStop() throws Exception
300 {
301 super.doStop();
302 }
303
304
305 protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key) throws IOException
306 {
307 return new SelectChannelEndPoint(channel,selectSet,key);
308 }
309
310
311 protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
312 {
313 return new HttpConnection(SelectChannelConnector.this,endpoint,getServer())
314 {
315
316 @Override
317 public void cancelTimeout(Task task)
318 {
319 endpoint.getSelectSet().cancelTimeout(task);
320 }
321
322
323 @Override
324 public void scheduleTimeout(Task task, long timeoutMs)
325 {
326 endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
327 }
328 };
329 }
330
331
332 public void dump()
333 {
334 Log.info("channel "+_acceptChannel+(_acceptChannel.isOpen()?" is open":" is closed"));
335 _manager.dump();
336 }
337 }