1 //
2 // ========================================================================
3 // Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
8 //
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
11 //
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
14 //
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
17 //
18
19 package org.eclipse.jetty.websocket;
20
21 import java.io.IOException;
22 import java.net.InetSocketAddress;
23 import java.net.ProtocolException;
24 import java.net.SocketAddress;
25 import java.net.URI;
26 import java.nio.channels.ByteChannel;
27 import java.nio.channels.SocketChannel;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.CopyOnWriteArrayList;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.Future;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37
38 import org.eclipse.jetty.util.log.Logger;
39
40
41 /* ------------------------------------------------------------ */
42 /**
43 * <p>{@link WebSocketClient} allows to create multiple connections to multiple destinations
44 * that can speak the websocket protocol.</p>
45 * <p>When creating websocket connections, {@link WebSocketClient} accepts a {@link WebSocket}
46 * object (to receive events from the server), and returns a {@link WebSocket.Connection} to
47 * send data to the server.</p>
48 * <p>Example usage is as follows:</p>
49 * <pre>
50 * WebSocketClientFactory factory = new WebSocketClientFactory();
51 * factory.start();
52 *
53 * WebSocketClient client = factory.newWebSocketClient();
54 * // Configure the client
55 *
56 * WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
57 * {
58 * public void onOpen(Connection connection)
59 * {
60 * // open notification
61 * }
62 *
63 * public void onClose(int closeCode, String message)
64 * {
65 * // close notification
66 * }
67 *
68 * public void onMessage(String data)
69 * {
70 * // handle incoming message
71 * }
72 * }).get(5, TimeUnit.SECONDS);
73 *
74 * connection.sendMessage("Hello World");
75 * </pre>
76 */
77 public class WebSocketClient
78 {
79 private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
80
81 private final WebSocketClientFactory _factory;
82 private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
83 private final List<String> _extensions=new CopyOnWriteArrayList<String>();
84 private String _origin;
85 private String _protocol;
86 private int _maxIdleTime=-1;
87 private int _maxTextMessageSize=16*1024;
88 private int _maxBinaryMessageSize=-1;
89 private MaskGen _maskGen;
90 private SocketAddress _bindAddress;
91
92 /* ------------------------------------------------------------ */
93 /**
94 * <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
95 * <p>This can be wasteful of resources if many clients are created.</p>
96 *
97 * @deprecated Use {@link WebSocketClientFactory#newWebSocketClient()}
98 * @throws Exception if the private WebSocketClientFactory fails to start
99 */
100 @Deprecated
101 public WebSocketClient() throws Exception
102 {
103 _factory=new WebSocketClientFactory();
104 _factory.start();
105 _maskGen=_factory.getMaskGen();
106 }
107
108 /* ------------------------------------------------------------ */
109 /**
110 * <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
111 *
112 * @param factory the shared {@link WebSocketClientFactory}
113 */
114 public WebSocketClient(WebSocketClientFactory factory)
115 {
116 _factory=factory;
117 _maskGen=_factory.getMaskGen();
118 }
119
120 /* ------------------------------------------------------------ */
121 /**
122 * @return The WebSocketClientFactory this client was created with.
123 */
124 public WebSocketClientFactory getFactory()
125 {
126 return _factory;
127 }
128
129 /* ------------------------------------------------------------ */
130 /**
131 * @return the address to bind the socket channel to
132 * @see #setBindAddress(SocketAddress)
133 */
134 public SocketAddress getBindAddress()
135 {
136 return _bindAddress;
137 }
138
139 /* ------------------------------------------------------------ */
140 /**
141 * @param bindAddress the address to bind the socket channel to
142 * @see #getBindAddress()
143 */
144 public void setBindAddress(SocketAddress bindAddress)
145 {
146 this._bindAddress = bindAddress;
147 }
148
149 /* ------------------------------------------------------------ */
150 /**
151 * @return The maxIdleTime in ms for connections opened by this client,
152 * or -1 if the default from {@link WebSocketClientFactory#getSelectorManager()} is used.
153 * @see #setMaxIdleTime(int)
154 */
155 public int getMaxIdleTime()
156 {
157 return _maxIdleTime;
158 }
159
160 /* ------------------------------------------------------------ */
161 /**
162 * @param maxIdleTime The max idle time in ms for connections opened by this client
163 * @see #getMaxIdleTime()
164 */
165 public void setMaxIdleTime(int maxIdleTime)
166 {
167 _maxIdleTime=maxIdleTime;
168 }
169
170 /* ------------------------------------------------------------ */
171 /**
172 * @return The subprotocol string for connections opened by this client.
173 * @see #setProtocol(String)
174 */
175 public String getProtocol()
176 {
177 return _protocol;
178 }
179
180 /* ------------------------------------------------------------ */
181 /**
182 * @param protocol The subprotocol string for connections opened by this client.
183 * @see #getProtocol()
184 */
185 public void setProtocol(String protocol)
186 {
187 _protocol = protocol;
188 }
189
190 /* ------------------------------------------------------------ */
191 /**
192 * @return The origin URI of the client
193 * @see #setOrigin(String)
194 */
195 public String getOrigin()
196 {
197 return _origin;
198 }
199
200 /* ------------------------------------------------------------ */
201 /**
202 * @param origin The origin URI of the client (eg "http://example.com")
203 * @see #getOrigin()
204 */
205 public void setOrigin(String origin)
206 {
207 _origin = origin;
208 }
209
210 /* ------------------------------------------------------------ */
211 /**
212 * <p>Returns the map of the cookies that are sent during the initial HTTP handshake
213 * that upgrades to the websocket protocol.</p>
214 * @return The read-write cookie map
215 */
216 public Map<String,String> getCookies()
217 {
218 return _cookies;
219 }
220
221 /* ------------------------------------------------------------ */
222 /**
223 * @return The list of websocket protocol extensions
224 */
225 public List<String> getExtensions()
226 {
227 return _extensions;
228 }
229
230 /* ------------------------------------------------------------ */
231 /**
232 * @return the mask generator to use, or null if not mask generator should be used
233 * @see #setMaskGen(MaskGen)
234 */
235 public MaskGen getMaskGen()
236 {
237 return _maskGen;
238 }
239
240 /* ------------------------------------------------------------ */
241 /**
242 * @param maskGen the mask generator to use, or null if not mask generator should be used
243 * @see #getMaskGen()
244 */
245 public void setMaskGen(MaskGen maskGen)
246 {
247 _maskGen = maskGen;
248 }
249
250 /* ------------------------------------------------------------ */
251 /**
252 * @return The initial maximum text message size (in characters) for a connection
253 */
254 public int getMaxTextMessageSize()
255 {
256 return _maxTextMessageSize;
257 }
258
259 /* ------------------------------------------------------------ */
260 /**
261 * Set the initial maximum text message size for a connection. This can be changed by
262 * the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
263 * @param maxTextMessageSize The default maximum text message size (in characters) for a connection
264 */
265 public void setMaxTextMessageSize(int maxTextMessageSize)
266 {
267 _maxTextMessageSize = maxTextMessageSize;
268 }
269
270 /* ------------------------------------------------------------ */
271 /**
272 * @return The initial maximum binary message size (in bytes) for a connection
273 */
274 public int getMaxBinaryMessageSize()
275 {
276 return _maxBinaryMessageSize;
277 }
278
279 /* ------------------------------------------------------------ */
280 /**
281 * Set the initial maximum binary message size for a connection. This can be changed by
282 * the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
283 * @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
284 */
285 public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
286 {
287 _maxBinaryMessageSize = maxBinaryMessageSize;
288 }
289
290 /* ------------------------------------------------------------ */
291 /**
292 * <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
293 *
294 * @param uri The URI to connect to.
295 * @param websocket The {@link WebSocket} instance to handle incoming events.
296 * @param maxConnectTime The interval to wait for a successful connection
297 * @param units the units of the maxConnectTime
298 * @return A {@link WebSocket.Connection}
299 * @throws IOException if the connection fails
300 * @throws InterruptedException if the thread is interrupted
301 * @throws TimeoutException if the timeout elapses before the connection is completed
302 * @see #open(URI, WebSocket)
303 */
304 public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
305 {
306 try
307 {
308 return open(uri,websocket).get(maxConnectTime,units);
309 }
310 catch (ExecutionException e)
311 {
312 Throwable cause = e.getCause();
313 if (cause instanceof IOException)
314 throw (IOException)cause;
315 if (cause instanceof Error)
316 throw (Error)cause;
317 if (cause instanceof RuntimeException)
318 throw (RuntimeException)cause;
319 throw new RuntimeException(cause);
320 }
321 }
322
323 /* ------------------------------------------------------------ */
324 /**
325 * <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
326 * <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
327 *
328 * @param uri The URI to connect to.
329 * @param websocket The {@link WebSocket} instance to handle incoming events.
330 * @return A {@link Future} to the {@link WebSocket.Connection}
331 * @throws IOException if the connection fails
332 * @see #open(URI, WebSocket, long, TimeUnit)
333 */
334 public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
335 {
336 if (!_factory.isStarted())
337 throw new IllegalStateException("Factory !started");
338
339 InetSocketAddress address = toSocketAddress(uri);
340
341 SocketChannel channel = SocketChannel.open();
342 if (_bindAddress != null)
343 channel.socket().bind(_bindAddress);
344 channel.socket().setTcpNoDelay(true);
345
346 WebSocketFuture holder = new WebSocketFuture(websocket, uri, this, channel);
347
348 channel.configureBlocking(false);
349 channel.connect(address);
350 _factory.getSelectorManager().register(channel, holder);
351
352 return holder;
353 }
354
355 public static InetSocketAddress toSocketAddress(URI uri)
356 {
357 String scheme = uri.getScheme();
358 if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
359 throw new IllegalArgumentException("Bad WebSocket scheme: " + scheme);
360 int port = uri.getPort();
361 if (port == 0)
362 throw new IllegalArgumentException("Bad WebSocket port: " + port);
363 if (port < 0)
364 port = "ws".equals(scheme) ? 80 : 443;
365
366 return new InetSocketAddress(uri.getHost(), port);
367 }
368
369 /* ------------------------------------------------------------ */
370 /** The Future Websocket Connection.
371 */
372 static class WebSocketFuture implements Future<WebSocket.Connection>
373 {
374 final WebSocket _websocket;
375 final URI _uri;
376 final WebSocketClient _client;
377 final CountDownLatch _done = new CountDownLatch(1);
378 ByteChannel _channel;
379 WebSocketConnection _connection;
380 Throwable _exception;
381
382 private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
383 {
384 _websocket=websocket;
385 _uri=uri;
386 _client=client;
387 _channel=channel;
388 }
389
390 public void onConnection(WebSocketConnection connection)
391 {
392 try
393 {
394 _client.getFactory().addConnection(connection);
395
396 connection.getConnection().setMaxTextMessageSize(_client.getMaxTextMessageSize());
397 connection.getConnection().setMaxBinaryMessageSize(_client.getMaxBinaryMessageSize());
398
399 WebSocketConnection con;
400 synchronized (this)
401 {
402 if (_channel!=null)
403 _connection=connection;
404 con=_connection;
405 }
406
407 if (con!=null)
408 {
409 if (_websocket instanceof WebSocket.OnFrame)
410 ((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());
411
412 _websocket.onOpen(con.getConnection());
413 }
414 }
415 finally
416 {
417 _done.countDown();
418 }
419 }
420
421 public void handshakeFailed(Throwable ex)
422 {
423 try
424 {
425 ByteChannel channel=null;
426 synchronized (this)
427 {
428 if (_channel!=null)
429 {
430 channel=_channel;
431 _channel=null;
432 _exception=ex;
433 }
434 }
435
436 if (channel!=null)
437 {
438 if (ex instanceof ProtocolException)
439 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_PROTOCOL,ex.getMessage());
440 else
441 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,ex.getMessage());
442 }
443 }
444 finally
445 {
446 _done.countDown();
447 }
448 }
449
450 public Map<String,String> getCookies()
451 {
452 return _client.getCookies();
453 }
454
455 public String getProtocol()
456 {
457 return _client.getProtocol();
458 }
459
460 public WebSocket getWebSocket()
461 {
462 return _websocket;
463 }
464
465 public URI getURI()
466 {
467 return _uri;
468 }
469
470 public int getMaxIdleTime()
471 {
472 return _client.getMaxIdleTime();
473 }
474
475 public String getOrigin()
476 {
477 return _client.getOrigin();
478 }
479
480 public MaskGen getMaskGen()
481 {
482 return _client.getMaskGen();
483 }
484
485 @Override
486 public String toString()
487 {
488 return "[" + _uri + ","+_websocket+"]@"+hashCode();
489 }
490
491 public boolean cancel(boolean mayInterruptIfRunning)
492 {
493 try
494 {
495 ByteChannel channel=null;
496 synchronized (this)
497 {
498 if (_connection==null && _exception==null && _channel!=null)
499 {
500 channel=_channel;
501 _channel=null;
502 }
503 }
504
505 if (channel!=null)
506 {
507 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"cancelled");
508 return true;
509 }
510 return false;
511 }
512 finally
513 {
514 _done.countDown();
515 }
516 }
517
518 public boolean isCancelled()
519 {
520 synchronized (this)
521 {
522 return _channel==null && _connection==null;
523 }
524 }
525
526 public boolean isDone()
527 {
528 synchronized (this)
529 {
530 return _connection!=null && _exception==null;
531 }
532 }
533
534 public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
535 {
536 try
537 {
538 return get(Long.MAX_VALUE,TimeUnit.SECONDS);
539 }
540 catch(TimeoutException e)
541 {
542 throw new IllegalStateException("The universe has ended",e);
543 }
544 }
545
546 public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
547 TimeoutException
548 {
549 _done.await(timeout,unit);
550 ByteChannel channel=null;
551 org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
552 Throwable exception;
553 synchronized (this)
554 {
555 exception=_exception;
556 if (_connection==null)
557 {
558 exception=_exception;
559 channel=_channel;
560 _channel=null;
561 }
562 else
563 connection=_connection.getConnection();
564 }
565
566 if (channel!=null)
567 closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"timeout");
568 if (exception!=null)
569 throw new ExecutionException(exception);
570 if (connection!=null)
571 return connection;
572 throw new TimeoutException();
573 }
574
575 private void closeChannel(ByteChannel channel,int code, String message)
576 {
577 try
578 {
579 _websocket.onClose(code,message);
580 }
581 catch(Exception e)
582 {
583 __log.warn(e);
584 }
585
586 try
587 {
588 channel.close();
589 }
590 catch(IOException e)
591 {
592 __log.debug(e);
593 }
594 }
595 }
596 }