1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.thread.Timeout;
30
31
32
33
34
35 public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint
36 {
37 private final SelectorManager.SelectSet _selectSet;
38 private final SelectorManager _manager;
39 private volatile Connection _connection;
40 private boolean _dispatched = false;
41 private boolean _redispatched = false;
42 private volatile boolean _writable = true;
43
44 private SelectionKey _key;
45 private int _interestOps;
46 private boolean _readBlocked;
47 private boolean _writeBlocked;
48 private boolean _open;
49 private final Timeout.Task _idleTask = new IdleTask();
50
51
52 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key)
53 {
54 super(channel);
55
56 _manager = selectSet.getManager();
57 _selectSet = selectSet;
58 _dispatched = false;
59 _redispatched = false;
60 _open=true;
61 _key = key;
62
63 _connection = _manager.newConnection(channel,this);
64 _manager.endPointOpened(this);
65
66 scheduleIdle();
67 }
68
69
70 public SelectionKey getSelectionKey()
71 {
72 synchronized (this)
73 {
74 return _key;
75 }
76 }
77
78
79 public SelectorManager getSelectManager()
80 {
81 return _manager;
82 }
83
84
85 public Connection getConnection()
86 {
87 return _connection;
88 }
89
90
91 public void setConnection(Connection connection)
92 {
93 Connection old=_connection;
94 _connection=connection;
95 _manager.endPointUpgraded(this,old);
96 }
97
98
99
100
101
102 public void schedule()
103 {
104 synchronized (this)
105 {
106
107 if (_key == null || !_key.isValid())
108 {
109 _readBlocked=false;
110 _writeBlocked=false;
111 this.notifyAll();
112 return;
113 }
114
115
116 if (_readBlocked || _writeBlocked)
117 {
118
119 if (_readBlocked && _key.isReadable())
120 _readBlocked=false;
121 if (_writeBlocked && _key.isWritable())
122 _writeBlocked=false;
123
124
125 this.notifyAll();
126
127
128 _key.interestOps(0);
129 return;
130 }
131
132
133 if (!isReadyForDispatch())
134 {
135
136 _key.interestOps(0);
137 return;
138 }
139
140
141
142 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
143 {
144
145 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
146 _key.interestOps(_interestOps);
147 _writable = true;
148 }
149
150 if (_dispatched)
151 _key.interestOps(0);
152 else
153 dispatch();
154 }
155 }
156
157
158 public void dispatch()
159 {
160 synchronized(this)
161 {
162 if (_dispatched)
163 _redispatched=true;
164 else
165 {
166 _dispatched = _manager.dispatch(this);
167 if(!_dispatched)
168 {
169 Log.warn("Dispatched Failed!");
170 updateKey();
171 }
172 }
173 }
174 }
175
176
177
178
179
180
181
182
183 private boolean undispatch()
184 {
185 synchronized (this)
186 {
187 if (_redispatched)
188 {
189 _redispatched=false;
190 return false;
191 }
192 _dispatched = false;
193 updateKey();
194 }
195 return true;
196 }
197
198
199 public void scheduleIdle()
200 {
201 _selectSet.scheduleIdle(_idleTask);
202 }
203
204
205 public void cancelIdle()
206 {
207 _selectSet.cancelIdle(_idleTask);
208 }
209
210
211 protected void idleExpired()
212 {
213 try
214 {
215 close();
216 }
217 catch (IOException e)
218 {
219 Log.ignore(e);
220 }
221 }
222
223
224
225
226 @Override
227 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
228 {
229 int l = super.flush(header, buffer, trailer);
230 if (!(_writable=l!=0))
231 {
232 synchronized (this)
233 {
234 if (!_dispatched)
235 updateKey();
236 }
237 }
238 return l;
239 }
240
241
242
243
244 @Override
245 public int flush(Buffer buffer) throws IOException
246 {
247 int l = super.flush(buffer);
248 if (!(_writable=l!=0))
249 {
250 synchronized (this)
251 {
252 if (!_dispatched)
253 updateKey();
254 }
255 }
256 return l;
257 }
258
259
260 public boolean isReadyForDispatch()
261 {
262 synchronized (this)
263 {
264 return !(_dispatched || getConnection().isSuspended());
265 }
266 }
267
268
269
270
271
272 @Override
273 public boolean blockReadable(long timeoutMs) throws IOException
274 {
275 synchronized (this)
276 {
277 long start=_selectSet.getNow();
278 try
279 {
280 _readBlocked=true;
281 while (isOpen() && _readBlocked)
282 {
283 try
284 {
285 updateKey();
286 this.wait(timeoutMs);
287
288 timeoutMs -= _selectSet.getNow()-start;
289 if (_readBlocked && timeoutMs<=0)
290 return false;
291 }
292 catch (InterruptedException e)
293 {
294 Log.warn(e);
295 }
296 }
297 }
298 finally
299 {
300 _readBlocked=false;
301 }
302 }
303 return true;
304 }
305
306
307
308
309
310 @Override
311 public boolean blockWritable(long timeoutMs) throws IOException
312 {
313 synchronized (this)
314 {
315 long start=_selectSet.getNow();
316 try
317 {
318 _writeBlocked=true;
319 while (isOpen() && _writeBlocked)
320 {
321 try
322 {
323 updateKey();
324 this.wait(timeoutMs);
325
326 timeoutMs -= _selectSet.getNow()-start;
327 if (_writeBlocked && timeoutMs<=0)
328 return false;
329 }
330 catch (InterruptedException e)
331 {
332 Log.warn(e);
333 }
334 }
335 }
336 finally
337 {
338 _writeBlocked=false;
339 }
340 }
341 return true;
342 }
343
344
345 public void setWritable(boolean writable)
346 {
347 _writable=writable;
348 }
349
350
351 public void scheduleWrite()
352 {
353 _writable=false;
354 updateKey();
355 }
356
357
358
359
360
361
362
363 private void updateKey()
364 {
365 synchronized (this)
366 {
367 int ops=-1;
368 if (getChannel().isOpen())
369 {
370 _interestOps =
371 ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ : 0)
372 | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE : 0);
373 try
374 {
375 ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
376 }
377 catch(Exception e)
378 {
379 _key=null;
380 Log.ignore(e);
381 }
382 }
383
384 if(_interestOps == ops && getChannel().isOpen())
385 return;
386
387 }
388 _selectSet.addChange(this);
389 _selectSet.wakeup();
390 }
391
392
393
394
395
396 void doUpdateKey()
397 {
398 synchronized (this)
399 {
400 if (getChannel().isOpen())
401 {
402 if (_interestOps>0)
403 {
404 if (_key==null || !_key.isValid())
405 {
406 SelectableChannel sc = (SelectableChannel)getChannel();
407 if (sc.isRegistered())
408 {
409 updateKey();
410 }
411 else
412 {
413 try
414 {
415 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
416 }
417 catch (Exception e)
418 {
419 Log.ignore(e);
420 if (_key!=null && _key.isValid())
421 {
422 _key.cancel();
423 }
424 cancelIdle();
425
426 if (_open)
427 _manager.endPointClosed(this);
428 _open=false;
429 _key = null;
430 }
431 }
432 }
433 else
434 {
435 _key.interestOps(_interestOps);
436 }
437 }
438 else
439 {
440 if (_key.isValid())
441 _key.interestOps(0);
442 else
443 _key=null;
444 }
445 }
446 else
447 {
448 if (_key!=null && _key.isValid())
449 _key.cancel();
450
451 cancelIdle();
452 if (_open)
453 _manager.endPointClosed(this);
454 _open=false;
455 _key = null;
456 }
457 }
458 }
459
460
461
462
463 public void run()
464 {
465 boolean dispatched=true;
466 try
467 {
468 while(dispatched)
469 {
470 try
471 {
472 while(true)
473 {
474 final Connection next = _connection.handle();
475 if (next!=_connection)
476 {
477 _connection=next;
478 continue;
479 }
480 break;
481 }
482 }
483 catch (ClosedChannelException e)
484 {
485 Log.ignore(e);
486 }
487 catch (EofException e)
488 {
489 Log.debug("EOF", e);
490 try{close();}
491 catch(IOException e2){Log.ignore(e2);}
492 }
493 catch (IOException e)
494 {
495 Log.warn(e.toString());
496 Log.debug(e);
497 try{close();}
498 catch(IOException e2){Log.ignore(e2);}
499 }
500 catch (Throwable e)
501 {
502 Log.warn("handle failed", e);
503 try{close();}
504 catch(IOException e2){Log.ignore(e2);}
505 }
506 dispatched=!undispatch();
507 }
508 }
509 finally
510 {
511 if (dispatched)
512 {
513 dispatched=!undispatch();
514 while (dispatched)
515 {
516 Log.warn("SCEP.run() finally DISPATCHED");
517 dispatched=!undispatch();
518 }
519 }
520 }
521 }
522
523
524
525
526
527 @Override
528 public void close() throws IOException
529 {
530 try
531 {
532 super.close();
533 }
534 catch (IOException e)
535 {
536 Log.ignore(e);
537 }
538 finally
539 {
540 updateKey();
541 }
542 }
543
544
545 @Override
546 public String toString()
547 {
548 synchronized(this)
549 {
550 return "SCEP@" + hashCode() + "\t[d=" + _dispatched + ",io=" + _interestOps+
551 ",w=" + _writable + ",b=" + _readBlocked + "|" + _writeBlocked + "]";
552 }
553 }
554
555
556 public Timeout.Task getTimeoutTask()
557 {
558 return _idleTask;
559 }
560
561
562 public SelectSet getSelectSet()
563 {
564 return _selectSet;
565 }
566
567
568
569
570 private class IdleTask extends Timeout.Task
571 {
572
573
574
575
576 @Override
577 public void expired()
578 {
579 idleExpired();
580 }
581
582 @Override
583 public String toString()
584 {
585 return "TimeoutTask:" + SelectChannelEndPoint.this.toString();
586 }
587 }
588 }