View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.util.io;
12  
13  import java.io.IOException;
14  import java.io.InterruptedIOException;
15  import java.io.OutputStream;
16  import java.util.concurrent.ArrayBlockingQueue;
17  import java.util.concurrent.Callable;
18  import java.util.concurrent.ExecutionException;
19  import java.util.concurrent.ExecutorService;
20  import java.util.concurrent.Future;
21  import java.util.concurrent.RejectedExecutionException;
22  import java.util.concurrent.ThreadFactory;
23  import java.util.concurrent.ThreadPoolExecutor;
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.TimeoutException;
26  import java.util.concurrent.atomic.AtomicInteger;
27  
28  import org.eclipse.jgit.internal.JGitText;
29  
30  /**
31   * OutputStream isolated from interrupts.
32   * <p>
33   * Wraps an OutputStream to prevent interrupts during writes from being made
34   * visible to that stream instance. This works around buggy or difficult
35   * OutputStream implementations like JSch that cannot gracefully handle an
36   * interrupt during write.
37   * <p>
38   * Every write (or flush) requires a context switch to another thread. Callers
39   * should wrap this stream with {@code BufferedOutputStream} using a suitable
40   * buffer size to amortize the cost of context switches.
41   *
42   * @since 4.6
43   */
44  public class IsolatedOutputStream extends OutputStream {
45  	private final OutputStream dst;
46  	private final ExecutorService copier;
47  	private Future<Void> pending;
48  
49  	/**
50  	 * Wraps an OutputStream.
51  	 *
52  	 * @param out
53  	 *            stream to send all writes to.
54  	 */
55  	public IsolatedOutputStream(OutputStream out) {
56  		dst = out;
57  		copier = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
58  				new ArrayBlockingQueue<>(1), new NamedThreadFactory());
59  	}
60  
61  	/** {@inheritDoc} */
62  	@Override
63  	public void write(int ch) throws IOException {
64  		write(new byte[] { (byte) ch }, 0, 1);
65  	}
66  
67  	/** {@inheritDoc} */
68  	@Override
69  	public void write(byte[] buf, int pos, int cnt)
70  			throws IOException {
71  		checkClosed();
72  		execute(() -> {
73  			dst.write(buf, pos, cnt);
74  			return null;
75  		});
76  	}
77  
78  	/** {@inheritDoc} */
79  	@Override
80  	public void flush() throws IOException {
81  		checkClosed();
82  		execute(() -> {
83  			dst.flush();
84  			return null;
85  		});
86  	}
87  
88  	/** {@inheritDoc} */
89  	@Override
90  	public void close() throws IOException {
91  		if (!copier.isShutdown()) {
92  			try {
93  				if (pending == null || tryCleanClose()) {
94  					cleanClose();
95  				} else {
96  					dirtyClose();
97  				}
98  			} finally {
99  				copier.shutdown();
100 			}
101 		}
102 	}
103 
104 	private boolean tryCleanClose() {
105 		/*
106 		 * If the caller stopped waiting for a prior write or flush, they could
107 		 * be trying to close a stream that is still in-use. Check if the prior
108 		 * operation ended in a predictable way.
109 		 */
110 		try {
111 			pending.get(0, TimeUnit.MILLISECONDS);
112 			pending = null;
113 			return true;
114 		} catch (TimeoutException | InterruptedException e) {
115 			return false;
116 		} catch (ExecutionException e) {
117 			pending = null;
118 			return true;
119 		}
120 	}
121 
122 	private void cleanClose() throws IOException {
123 		execute(() -> {
124 			dst.close();
125 			return null;
126 		});
127 	}
128 
129 	private void dirtyClose() throws IOException {
130 		/*
131 		 * Interrupt any still pending write or flush operation. This may cause
132 		 * massive failures inside of the stream, but its going to be closed as
133 		 * the next step.
134 		 */
135 		pending.cancel(true);
136 
137 		Future<Void> close;
138 		try {
139 			close = copier.submit(() -> {
140 				dst.close();
141 				return null;
142 			});
143 		} catch (RejectedExecutionException e) {
144 			throw new IOException(e);
145 		}
146 		try {
147 			close.get(200, TimeUnit.MILLISECONDS);
148 		} catch (InterruptedException | TimeoutException e) {
149 			close.cancel(true);
150 			throw new IOException(e);
151 		} catch (ExecutionException e) {
152 			throw new IOException(e.getCause());
153 		}
154 	}
155 
156 	private void checkClosed() throws IOException {
157 		if (copier.isShutdown()) {
158 			throw new IOException(JGitText.get().closed);
159 		}
160 	}
161 
162 	private void execute(Callable<Void> task) throws IOException {
163 		if (pending != null) {
164 			// Check (and rethrow) any prior failed operation.
165 			checkedGet(pending);
166 		}
167 		try {
168 			pending = copier.submit(task);
169 		} catch (RejectedExecutionException e) {
170 			throw new IOException(e);
171 		}
172 		checkedGet(pending);
173 		pending = null;
174 	}
175 
176 	private static void checkedGet(Future<Void> future) throws IOException {
177 		try {
178 			future.get();
179 		} catch (InterruptedException e) {
180 			throw interrupted(e);
181 		} catch (ExecutionException e) {
182 			throw new IOException(e.getCause());
183 		}
184 	}
185 
186 	private static InterruptedIOException interrupted(InterruptedException c) {
187 		InterruptedIOException e = new InterruptedIOException();
188 		e.initCause(c);
189 		return e;
190 	}
191 
192 	private static class NamedThreadFactory implements ThreadFactory {
193 		private static final AtomicInteger cnt = new AtomicInteger();
194 
195 		@Override
196 		public Thread newThread(Runnable r) {
197 			int n = cnt.incrementAndGet();
198 			String name = IsolatedOutputStream.class.getSimpleName() + '-' + n;
199 			return new Thread(r, name);
200 		}
201 	}
202 }