1
2
3
4
5
6
7
8
9
10
11
12
13
14 package org.eclipse.jetty.io.nio;
15
16 import java.io.IOException;
17 import java.nio.channels.ClosedChannelException;
18 import java.nio.channels.SelectableChannel;
19 import java.nio.channels.SelectionKey;
20 import java.nio.channels.SocketChannel;
21
22 import org.eclipse.jetty.io.AsyncEndPoint;
23 import org.eclipse.jetty.io.Buffer;
24 import org.eclipse.jetty.io.ConnectedEndPoint;
25 import org.eclipse.jetty.io.Connection;
26 import org.eclipse.jetty.io.EofException;
27 import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
28 import org.eclipse.jetty.util.log.Log;
29 import org.eclipse.jetty.util.log.Logger;
30 import org.eclipse.jetty.util.thread.Timeout.Task;
31
32
33
34
35
36 public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPoint, ConnectedEndPoint
37 {
38 public static final Logger LOG=Log.getLogger("org.eclipse.jetty.io.nio");
39
40 private final SelectorManager.SelectSet _selectSet;
41 private final SelectorManager _manager;
42 private SelectionKey _key;
43 private final Runnable _handler = new Runnable()
44 {
45 public void run() { handle(); }
46 };
47
48
49 private int _interestOps;
50
51
52
53
54
55
56
57 private volatile AsyncConnection _connection;
58
59
60 private boolean _dispatched = false;
61
62
63 private boolean _asyncDispatch = false;
64
65
66 private volatile boolean _writable = true;
67
68
69
70 private boolean _readBlocked;
71
72
73 private boolean _writeBlocked;
74
75
76 private boolean _open;
77
78 private volatile long _idleTimestamp;
79
80 private boolean _ishut;
81
82
83 public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
84 throws IOException
85 {
86 super(channel, maxIdleTime);
87
88 _manager = selectSet.getManager();
89 _selectSet = selectSet;
90 _dispatched = false;
91 _asyncDispatch = false;
92 _open=true;
93 _key = key;
94
95 setCheckForIdle(true);
96 }
97
98
99 public SelectionKey getSelectionKey()
100 {
101 synchronized (this)
102 {
103 return _key;
104 }
105 }
106
107
108 public SelectorManager getSelectManager()
109 {
110 return _manager;
111 }
112
113
114 public Connection getConnection()
115 {
116 return _connection;
117 }
118
119
120 public void setConnection(Connection connection)
121 {
122 Connection old=_connection;
123 _connection=(AsyncConnection)connection;
124 if (old!=null && old!=_connection)
125 _manager.endPointUpgraded(this,old);
126 }
127
128
129 public long getIdleTimestamp()
130 {
131 return _idleTimestamp;
132 }
133
134
135
136
137
138 public void schedule()
139 {
140 synchronized (this)
141 {
142
143 if (_key == null || !_key.isValid())
144 {
145 _readBlocked=false;
146 _writeBlocked=false;
147 this.notifyAll();
148 return;
149 }
150
151
152 if (_readBlocked || _writeBlocked)
153 {
154
155 if (_readBlocked && _key.isReadable())
156 _readBlocked=false;
157 if (_writeBlocked && _key.isWritable())
158 _writeBlocked=false;
159
160
161 this.notifyAll();
162
163
164 _key.interestOps(0);
165 if (!_dispatched)
166 updateKey();
167 return;
168 }
169
170
171 if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
172 {
173
174 _interestOps = _key.interestOps() & ~SelectionKey.OP_WRITE;
175 _key.interestOps(_interestOps);
176 _writable = true;
177 }
178
179
180 if (!_dispatched)
181 {
182 dispatch();
183 if (_dispatched && !_selectSet.getManager().isDeferringInterestedOps0())
184 {
185 _key.interestOps(0);
186 }
187 }
188 }
189 }
190
191
192 public void asyncDispatch()
193 {
194 synchronized(this)
195 {
196 if (_dispatched)
197 _asyncDispatch=true;
198 else
199 dispatch();
200 }
201 }
202
203
204 public void dispatch()
205 {
206 synchronized(this)
207 {
208 if (_dispatched)
209 {
210 throw new IllegalStateException("dispatched");
211 }
212 else
213 {
214 _dispatched = true;
215 boolean dispatched = _manager.dispatch(_handler);
216 if(!dispatched)
217 {
218 _dispatched = false;
219 LOG.warn("Dispatched Failed! "+this+" to "+_manager);
220 updateKey();
221 }
222 }
223 }
224 }
225
226
227
228
229
230
231
232
233 protected boolean undispatch()
234 {
235 synchronized (this)
236 {
237 if (_asyncDispatch)
238 {
239 _asyncDispatch=false;
240 return false;
241 }
242 _dispatched = false;
243 updateKey();
244 }
245 return true;
246 }
247
248
249 public void cancelTimeout(Task task)
250 {
251 getSelectSet().cancelTimeout(task);
252 }
253
254
255 public void scheduleTimeout(Task task, long timeoutMs)
256 {
257 getSelectSet().scheduleTimeout(task,timeoutMs);
258 }
259
260
261 public void setCheckForIdle(boolean check)
262 {
263 _idleTimestamp=check?System.currentTimeMillis():0;
264 }
265
266
267 public boolean isCheckForIdle()
268 {
269 return _idleTimestamp!=0;
270 }
271
272
273 protected void notIdle()
274 {
275 if (_idleTimestamp!=0)
276 _idleTimestamp=System.currentTimeMillis();
277 }
278
279
280 public void checkIdleTimestamp(long now)
281 {
282 long idleTimestamp=_idleTimestamp;
283 if (!getChannel().isOpen() || idleTimestamp!=0 && _maxIdleTime>0 && now>(idleTimestamp+_maxIdleTime))
284 {
285 onIdleExpired();
286 _idleTimestamp=now;
287 }
288 }
289
290
291 public void onIdleExpired()
292 {
293 _connection.onIdleExpired();
294 }
295
296
297 @Override
298 public int fill(Buffer buffer) throws IOException
299 {
300 int fill=super.fill(buffer);
301 if (fill>0)
302 notIdle();
303 return fill;
304 }
305
306
307 @Override
308 public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
309 {
310 int l = super.flush(header, buffer, trailer);
311
312
313 if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
314 {
315 synchronized (this)
316 {
317 _writable=false;
318 if (!_dispatched)
319 updateKey();
320 }
321 }
322 else if (l>0)
323 {
324 _writable=true;
325 notIdle();
326 }
327 return l;
328 }
329
330
331
332
333 @Override
334 public int flush(Buffer buffer) throws IOException
335 {
336 int l = super.flush(buffer);
337
338
339 if (l==0 && buffer!=null && buffer.hasContent())
340 {
341 synchronized (this)
342 {
343 _writable=false;
344 if (!_dispatched)
345 updateKey();
346 }
347 }
348 else if (l>0)
349 {
350 _writable=true;
351 notIdle();
352 }
353
354 return l;
355 }
356
357
358
359
360
361 @Override
362 public boolean blockReadable(long timeoutMs) throws IOException
363 {
364 synchronized (this)
365 {
366 if (isInputShutdown())
367 throw new EofException();
368
369 long now=_selectSet.getNow();
370 long end=now+timeoutMs;
371 boolean check=isCheckForIdle();
372 setCheckForIdle(true);
373 try
374 {
375 _readBlocked=true;
376 while (!isInputShutdown() && _readBlocked)
377 {
378 try
379 {
380 updateKey();
381 this.wait(timeoutMs>=0?(end-now):10000);
382 }
383 catch (InterruptedException e)
384 {
385 LOG.warn(e);
386 }
387 finally
388 {
389 now=_selectSet.getNow();
390 }
391
392 if (_readBlocked && timeoutMs>0 && now>=end)
393 return false;
394 }
395 }
396 finally
397 {
398 _readBlocked=false;
399 setCheckForIdle(check);
400 }
401 }
402 return true;
403 }
404
405
406
407
408
409 @Override
410 public boolean blockWritable(long timeoutMs) throws IOException
411 {
412 synchronized (this)
413 {
414 if (isOutputShutdown())
415 throw new EofException();
416
417 long now=_selectSet.getNow();
418 long end=now+timeoutMs;
419 boolean check=isCheckForIdle();
420 setCheckForIdle(true);
421 try
422 {
423 _writeBlocked=true;
424 while (_writeBlocked && !isOutputShutdown())
425 {
426 try
427 {
428 updateKey();
429 this.wait(timeoutMs>=0?(end-now):10000);
430 }
431 catch (InterruptedException e)
432 {
433 LOG.warn(e);
434 }
435 finally
436 {
437 now=_selectSet.getNow();
438 }
439 if (_writeBlocked && timeoutMs>0 && now>=end)
440 return false;
441 }
442 }
443 finally
444 {
445 _writeBlocked=false;
446 setCheckForIdle(check);
447 }
448 }
449 return true;
450 }
451
452
453
454 public void clearWritable()
455 {
456 _writable=false;
457 }
458
459
460
461
462
463 public void scheduleWrite()
464 {
465 if (_writable==true)
466 LOG.debug("Required scheduleWrite {}",this);
467
468 _writable=false;
469 updateKey();
470 }
471
472
473 public boolean isWritable()
474 {
475 return _writable;
476 }
477
478
479 public boolean hasProgressed()
480 {
481 return false;
482 }
483
484
485
486
487
488
489
490 private void updateKey()
491 {
492 final boolean changed;
493 synchronized (this)
494 {
495 int current_ops=-1;
496 if (getChannel().isOpen())
497 {
498 boolean read_interest = _readBlocked || (!_dispatched && !_connection.isSuspended());
499 boolean write_interest= _writeBlocked || (!_dispatched && !_writable);
500
501 _interestOps =
502 ((!_socket.isInputShutdown() && read_interest ) ? SelectionKey.OP_READ : 0)
503 | ((!_socket.isOutputShutdown()&& write_interest) ? SelectionKey.OP_WRITE : 0);
504 try
505 {
506 current_ops = ((_key!=null && _key.isValid())?_key.interestOps():-1);
507 }
508 catch(Exception e)
509 {
510 _key=null;
511 LOG.ignore(e);
512 }
513 }
514 changed=_interestOps!=current_ops;
515 }
516
517 if(changed)
518 {
519 _selectSet.addChange(this);
520 _selectSet.wakeup();
521 }
522 }
523
524
525
526
527
528
529 void doUpdateKey()
530 {
531 synchronized (this)
532 {
533 if (getChannel().isOpen())
534 {
535 if (_interestOps>0)
536 {
537 if (_key==null || !_key.isValid())
538 {
539 SelectableChannel sc = (SelectableChannel)getChannel();
540 if (sc.isRegistered())
541 {
542 updateKey();
543 }
544 else
545 {
546 try
547 {
548 _key=((SelectableChannel)getChannel()).register(_selectSet.getSelector(),_interestOps,this);
549 }
550 catch (Exception e)
551 {
552 LOG.ignore(e);
553 if (_key!=null && _key.isValid())
554 {
555 _key.cancel();
556 }
557
558 if (_open)
559 {
560 _selectSet.destroyEndPoint(this);
561 }
562 _open=false;
563 _key = null;
564 }
565 }
566 }
567 else
568 {
569 _key.interestOps(_interestOps);
570 }
571 }
572 else
573 {
574 if (_key!=null && _key.isValid())
575 _key.interestOps(0);
576 else
577 _key=null;
578 }
579 }
580 else
581 {
582 if (_key!=null && _key.isValid())
583 _key.cancel();
584
585 if (_open)
586 {
587 _open=false;
588 _selectSet.destroyEndPoint(this);
589 }
590 _key = null;
591 }
592 }
593 }
594
595
596
597
598 protected void handle()
599 {
600 boolean dispatched=true;
601 try
602 {
603 while(dispatched)
604 {
605 try
606 {
607 while(true)
608 {
609 final AsyncConnection next = (AsyncConnection)_connection.handle();
610 if (next!=_connection)
611 {
612 LOG.debug("{} replaced {}",next,_connection);
613 Connection old=_connection;
614 _connection=next;
615 _manager.endPointUpgraded(this,old);
616 continue;
617 }
618 break;
619 }
620 }
621 catch (ClosedChannelException e)
622 {
623 LOG.ignore(e);
624 }
625 catch (EofException e)
626 {
627 LOG.debug("EOF", e);
628 try{close();}
629 catch(IOException e2){LOG.ignore(e2);}
630 }
631 catch (IOException e)
632 {
633 LOG.warn(e.toString());
634 LOG.debug(e);
635 try{close();}
636 catch(IOException e2){LOG.ignore(e2);}
637 }
638 catch (Throwable e)
639 {
640 LOG.warn("handle failed", e);
641 try{close();}
642 catch(IOException e2){LOG.ignore(e2);}
643 }
644 finally
645 {
646 if (!_ishut && isInputShutdown() && isOpen())
647 {
648 _ishut=true;
649 try
650 {
651 _connection.onInputShutdown();
652 }
653 catch(Throwable x)
654 {
655 LOG.warn("onInputShutdown failed", x);
656 try{close();}
657 catch(IOException e2){LOG.ignore(e2);}
658 }
659 finally
660 {
661 updateKey();
662 }
663 }
664 dispatched=!undispatch();
665 }
666 }
667 }
668 finally
669 {
670 if (dispatched)
671 {
672 dispatched=!undispatch();
673 while (dispatched)
674 {
675 LOG.warn("SCEP.run() finally DISPATCHED");
676 dispatched=!undispatch();
677 }
678 }
679 }
680 }
681
682
683
684
685
686 @Override
687 public void close() throws IOException
688 {
689 try
690 {
691 super.close();
692 }
693 catch (IOException e)
694 {
695 LOG.ignore(e);
696 }
697 finally
698 {
699 updateKey();
700 }
701 }
702
703
704 @Override
705 public String toString()
706 {
707 synchronized(this)
708 {
709 return String.format("SCEP@%x{%s->%s,d=%b,open=%b,ishut=%b,oshut=%b,rb=%b,wb=%b,w=%b,i=%d%s%s%s}",
710 hashCode(),
711 _socket.getRemoteSocketAddress(),
712 _socket.getLocalSocketAddress(),
713 _dispatched,
714 isOpen(),
715 isInputShutdown(),
716 isOutputShutdown(),
717 _readBlocked,
718 _writeBlocked,
719 _writable,
720 _interestOps,
721 _key != null && _key.isValid() ? "" : "!",
722 _key != null && _key.isValid() && _key.isReadable() ? "r" : "",
723 _key != null && _key.isValid() && _key.isWritable() ? "w" : "");
724 }
725 }
726
727
728 public SelectSet getSelectSet()
729 {
730 return _selectSet;
731 }
732
733
734
735
736
737
738 @Override
739 public void setMaxIdleTime(int timeMs) throws IOException
740 {
741 _maxIdleTime=timeMs;
742 }
743
744 }