View Javadoc

1   //
2   //  ========================================================================
3   //  Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4   //  ------------------------------------------------------------------------
5   //  All rights reserved. This program and the accompanying materials
6   //  are made available under the terms of the Eclipse Public License v1.0
7   //  and Apache License v2.0 which accompanies this distribution.
8   //
9   //      The Eclipse Public License is available at
10  //      http://www.eclipse.org/legal/epl-v10.html
11  //
12  //      The Apache License v2.0 is available at
13  //      http://www.opensource.org/licenses/apache2.0.php
14  //
15  //  You may elect to redistribute this code under either of these licenses.
16  //  ========================================================================
17  //
18  
19  package org.eclipse.jetty.client;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  
24  import org.eclipse.jetty.http.AbstractGenerator;
25  import org.eclipse.jetty.http.HttpStatus;
26  import org.eclipse.jetty.io.Buffer;
27  import org.eclipse.jetty.io.Buffers;
28  import org.eclipse.jetty.io.Connection;
29  import org.eclipse.jetty.io.EndPoint;
30  import org.eclipse.jetty.util.log.Log;
31  import org.eclipse.jetty.util.log.Logger;
32  
33  
34  /* ------------------------------------------------------------ */
35  /** Blocking HTTP Connection
36   */
37  public class BlockingHttpConnection extends AbstractHttpConnection
38  {
39      private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class);
40  
41      private boolean _requestComplete;
42      private Buffer _requestContentChunk;
43      private boolean _expired=false;
44  
45      BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)
46      {
47          super(requestBuffers, responseBuffers, endPoint);
48      }
49  
50      protected void reset() throws IOException
51      {
52          _requestComplete = false;
53          _expired = false;
54          super.reset();
55      }
56      
57      
58      @Override
59      protected void exchangeExpired(HttpExchange exchange)
60      {
61          synchronized (this)
62          {
63             super.exchangeExpired(exchange);
64             _expired = true;
65             this.notifyAll();
66          }
67      }
68      
69      
70  
71      @Override
72      public void onIdleExpired(long idleForMs)
73      {
74          try
75          {
76              LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
77              _expired = true;
78              _endp.close();
79          }
80          catch(IOException e)
81          {
82              LOG.ignore(e);
83  
84              try
85              {
86                  _endp.close();
87              }
88              catch(IOException e2)
89              {
90                  LOG.ignore(e2);
91              }
92          }
93  
94          synchronized(this)
95          {
96              this.notifyAll();
97          }
98      }
99  
100     @Override
101     public Connection handle() throws IOException
102     {
103         Connection connection = this;
104 
105         try
106         {
107             boolean failed = false;
108 
109 
110             // While we are making progress and have not changed connection
111             while (_endp.isOpen() && connection==this)
112             {
113                 LOG.debug("open={} more={}",_endp.isOpen(),_parser.isMoreInBuffer());
114 
115                 HttpExchange exchange;
116                 synchronized (this)
117                 {
118                     exchange=_exchange;
119                     while (exchange == null)
120                     {
121                         try
122                         {
123                             this.wait();
124                             exchange=_exchange;
125                             if (_expired)
126                             {
127                                 failed = true;
128                                 throw new InterruptedException();
129                             }
130 
131                         }
132                         catch (InterruptedException e)
133                         {
134                             throw new InterruptedIOException();
135                         }
136                     }
137                 }
138                 LOG.debug("exchange {}",exchange);
139 
140                 try
141                 {
142                     // Should we commit the request?
143                     if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
144                     {
145                         LOG.debug("commit");
146                         commitRequest();
147                     }
148 
149                     // Generate output
150                     while (_generator.isCommitted() && !_generator.isComplete())
151                     {
152                         if (_generator.flushBuffer()>0)
153                         {
154                             LOG.debug("flushed");
155                         }
156 
157                         // Is there more content to send or should we complete the generator
158                         if (_generator.isState(AbstractGenerator.STATE_CONTENT))
159                         {
160                             // Look for more content to send.
161                             if (_requestContentChunk==null)
162                                 _requestContentChunk = exchange.getRequestContentChunk(null);
163 
164                             if (_requestContentChunk==null)
165                             {
166                                 LOG.debug("complete");
167                                 _generator.complete();
168                             }
169                             else if (_generator.isEmpty())
170                             {
171                                 LOG.debug("addChunk");
172                                 Buffer chunk=_requestContentChunk;
173                                 _requestContentChunk=exchange.getRequestContentChunk(null);
174                                 _generator.addContent(chunk,_requestContentChunk==null);
175                                 if (_requestContentChunk==null)
176                                     exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
177                             }
178                         }
179                     }
180 
181                     // Signal request completion
182                     if (_generator.isComplete() && !_requestComplete)
183                     {
184                         LOG.debug("requestComplete");
185                         _requestComplete = true;
186                         exchange.getEventListener().onRequestComplete();
187                     }
188 
189                     // Read any input that is available
190                     if (!_parser.isComplete() && _parser.parseAvailable())
191                     {
192                         LOG.debug("parsed");
193                     }
194 
195                     // Flush output
196                     _endp.flush();
197                 }
198                 catch (Throwable e)
199                 {
200                     LOG.debug("Failure on " + _exchange, e);
201 
202                     failed = true;
203 
204                     synchronized (this)
205                     {
206                         if (exchange != null)
207                         {
208                             // Cancelling the exchange causes an exception as we close the connection,
209                             // but we don't report it as it is normal cancelling operation
210                             if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
211                                     exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
212                                     !exchange.isDone())
213                             {
214                                 if(exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
215                                     exchange.getEventListener().onException(e);
216                             }
217                         }
218                         else
219                         {
220                             if (e instanceof IOException)
221                                 throw (IOException)e;
222                             if (e instanceof Error)
223                                 throw (Error)e;
224                             if (e instanceof RuntimeException)
225                                 throw (RuntimeException)e;
226                             throw new RuntimeException(e);
227                         }
228                     }
229                 }
230                 finally
231                 {
232                     LOG.debug("{} {}",_generator, _parser);
233                     LOG.debug("{}",_endp);
234 
235                     boolean complete = failed || _generator.isComplete() && _parser.isComplete();
236 
237                     if (complete)
238                     {
239                         boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
240                         _generator.setPersistent(persistent);
241                         reset();
242                         if (persistent)
243                             _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
244 
245                         synchronized (this)
246                         {
247                             exchange=_exchange;
248                             _exchange = null;
249 
250                             // Cancel the exchange
251                             if (exchange!=null)
252                             {
253                                 exchange.cancelTimeout(_destination.getHttpClient());
254 
255                                 // TODO should we check the exchange is done?
256                             }
257 
258                             // handle switched protocols
259                             if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
260                             {
261                                 Connection switched=exchange.onSwitchProtocol(_endp);
262                                 if (switched!=null)
263                                     connection=switched;
264                                 {
265                                     // switched protocol!
266                                     _pipeline = null;
267                                     if (_pipeline!=null)
268                                         _destination.send(_pipeline);
269                                     _pipeline = null;
270 
271                                     connection=switched;
272                                 }
273                             }
274 
275                             // handle pipelined requests
276                             if (_pipeline!=null)
277                             {
278                                 if (!persistent || connection!=this)
279                                     _destination.send(_pipeline);
280                                 else
281                                     _exchange=_pipeline;
282                                 _pipeline=null;
283                             }
284 
285                             if (_exchange==null && !isReserved())  // TODO how do we return switched connections?
286                                 _destination.returnConnection(this, !persistent);
287                         }
288                     }
289                 }
290             }
291         }
292         finally
293         {
294             _parser.returnBuffers();
295             _generator.returnBuffers();
296         }
297 
298         return connection;
299     }
300 
301     @Override
302     public boolean send(HttpExchange ex) throws IOException
303     {
304         boolean sent=super.send(ex);
305         if (sent)
306         {
307             synchronized (this)
308             {
309                 notifyAll();
310             }
311         }
312         return sent;
313     }
314 }