1 //
2 // ========================================================================
3 // Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
8 //
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
11 //
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
14 //
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
17 //
18
19 package org.eclipse.jetty.client;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23
24 import org.eclipse.jetty.http.AbstractGenerator;
25 import org.eclipse.jetty.http.HttpStatus;
26 import org.eclipse.jetty.io.Buffer;
27 import org.eclipse.jetty.io.Buffers;
28 import org.eclipse.jetty.io.Connection;
29 import org.eclipse.jetty.io.EndPoint;
30 import org.eclipse.jetty.util.log.Log;
31 import org.eclipse.jetty.util.log.Logger;
32
33
34 /* ------------------------------------------------------------ */
35 /** Blocking HTTP Connection
36 */
37 public class BlockingHttpConnection extends AbstractHttpConnection
38 {
39 private static final Logger LOG = Log.getLogger(BlockingHttpConnection.class);
40
41 private boolean _requestComplete;
42 private Buffer _requestContentChunk;
43 private boolean _expired=false;
44
45 BlockingHttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endPoint)
46 {
47 super(requestBuffers, responseBuffers, endPoint);
48 }
49
50 protected void reset() throws IOException
51 {
52 _requestComplete = false;
53 _expired = false;
54 super.reset();
55 }
56
57
58 @Override
59 protected void exchangeExpired(HttpExchange exchange)
60 {
61 synchronized (this)
62 {
63 super.exchangeExpired(exchange);
64 _expired = true;
65 this.notifyAll();
66 }
67 }
68
69
70
71 @Override
72 public void onIdleExpired(long idleForMs)
73 {
74 try
75 {
76 LOG.debug("onIdleExpired {}ms {} {}",idleForMs,this,_endp);
77 _expired = true;
78 _endp.close();
79 }
80 catch(IOException e)
81 {
82 LOG.ignore(e);
83
84 try
85 {
86 _endp.close();
87 }
88 catch(IOException e2)
89 {
90 LOG.ignore(e2);
91 }
92 }
93
94 synchronized(this)
95 {
96 this.notifyAll();
97 }
98 }
99
100 @Override
101 public Connection handle() throws IOException
102 {
103 Connection connection = this;
104
105 try
106 {
107 boolean failed = false;
108
109
110 // While we are making progress and have not changed connection
111 while (_endp.isOpen() && connection==this)
112 {
113 LOG.debug("open={} more={}",_endp.isOpen(),_parser.isMoreInBuffer());
114
115 HttpExchange exchange;
116 synchronized (this)
117 {
118 exchange=_exchange;
119 while (exchange == null)
120 {
121 try
122 {
123 this.wait();
124 exchange=_exchange;
125 if (_expired)
126 {
127 failed = true;
128 throw new InterruptedException();
129 }
130
131 }
132 catch (InterruptedException e)
133 {
134 throw new InterruptedIOException();
135 }
136 }
137 }
138 LOG.debug("exchange {}",exchange);
139
140 try
141 {
142 // Should we commit the request?
143 if (!_generator.isCommitted() && exchange!=null && exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
144 {
145 LOG.debug("commit");
146 commitRequest();
147 }
148
149 // Generate output
150 while (_generator.isCommitted() && !_generator.isComplete())
151 {
152 if (_generator.flushBuffer()>0)
153 {
154 LOG.debug("flushed");
155 }
156
157 // Is there more content to send or should we complete the generator
158 if (_generator.isState(AbstractGenerator.STATE_CONTENT))
159 {
160 // Look for more content to send.
161 if (_requestContentChunk==null)
162 _requestContentChunk = exchange.getRequestContentChunk(null);
163
164 if (_requestContentChunk==null)
165 {
166 LOG.debug("complete");
167 _generator.complete();
168 }
169 else if (_generator.isEmpty())
170 {
171 LOG.debug("addChunk");
172 Buffer chunk=_requestContentChunk;
173 _requestContentChunk=exchange.getRequestContentChunk(null);
174 _generator.addContent(chunk,_requestContentChunk==null);
175 if (_requestContentChunk==null)
176 exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_RESPONSE);
177 }
178 }
179 }
180
181 // Signal request completion
182 if (_generator.isComplete() && !_requestComplete)
183 {
184 LOG.debug("requestComplete");
185 _requestComplete = true;
186 exchange.getEventListener().onRequestComplete();
187 }
188
189 // Read any input that is available
190 if (!_parser.isComplete() && _parser.parseAvailable())
191 {
192 LOG.debug("parsed");
193 }
194
195 // Flush output
196 _endp.flush();
197 }
198 catch (Throwable e)
199 {
200 LOG.debug("Failure on " + _exchange, e);
201
202 failed = true;
203
204 synchronized (this)
205 {
206 if (exchange != null)
207 {
208 // Cancelling the exchange causes an exception as we close the connection,
209 // but we don't report it as it is normal cancelling operation
210 if (exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
211 exchange.getStatus() != HttpExchange.STATUS_CANCELLED &&
212 !exchange.isDone())
213 {
214 if(exchange.setStatus(HttpExchange.STATUS_EXCEPTED))
215 exchange.getEventListener().onException(e);
216 }
217 }
218 else
219 {
220 if (e instanceof IOException)
221 throw (IOException)e;
222 if (e instanceof Error)
223 throw (Error)e;
224 if (e instanceof RuntimeException)
225 throw (RuntimeException)e;
226 throw new RuntimeException(e);
227 }
228 }
229 }
230 finally
231 {
232 LOG.debug("{} {}",_generator, _parser);
233 LOG.debug("{}",_endp);
234
235 boolean complete = failed || _generator.isComplete() && _parser.isComplete();
236
237 if (complete)
238 {
239 boolean persistent = !failed && _parser.isPersistent() && _generator.isPersistent();
240 _generator.setPersistent(persistent);
241 reset();
242 if (persistent)
243 _endp.setMaxIdleTime((int)_destination.getHttpClient().getIdleTimeout());
244
245 synchronized (this)
246 {
247 exchange=_exchange;
248 _exchange = null;
249
250 // Cancel the exchange
251 if (exchange!=null)
252 {
253 exchange.cancelTimeout(_destination.getHttpClient());
254
255 // TODO should we check the exchange is done?
256 }
257
258 // handle switched protocols
259 if (_status==HttpStatus.SWITCHING_PROTOCOLS_101)
260 {
261 Connection switched=exchange.onSwitchProtocol(_endp);
262 if (switched!=null)
263 connection=switched;
264 {
265 // switched protocol!
266 _pipeline = null;
267 if (_pipeline!=null)
268 _destination.send(_pipeline);
269 _pipeline = null;
270
271 connection=switched;
272 }
273 }
274
275 // handle pipelined requests
276 if (_pipeline!=null)
277 {
278 if (!persistent || connection!=this)
279 _destination.send(_pipeline);
280 else
281 _exchange=_pipeline;
282 _pipeline=null;
283 }
284
285 if (_exchange==null && !isReserved()) // TODO how do we return switched connections?
286 _destination.returnConnection(this, !persistent);
287 }
288 }
289 }
290 }
291 }
292 finally
293 {
294 _parser.returnBuffers();
295 _generator.returnBuffers();
296 }
297
298 return connection;
299 }
300
301 @Override
302 public boolean send(HttpExchange ex) throws IOException
303 {
304 boolean sent=super.send(ex);
305 if (sent)
306 {
307 synchronized (this)
308 {
309 notifyAll();
310 }
311 }
312 return sent;
313 }
314 }