1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.util.io;
45
46 import java.io.IOException;
47 import java.io.InterruptedIOException;
48 import java.io.OutputStream;
49 import java.util.concurrent.ArrayBlockingQueue;
50 import java.util.concurrent.Callable;
51 import java.util.concurrent.ExecutionException;
52 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Future;
54 import java.util.concurrent.RejectedExecutionException;
55 import java.util.concurrent.ThreadFactory;
56 import java.util.concurrent.ThreadPoolExecutor;
57 import java.util.concurrent.TimeUnit;
58 import java.util.concurrent.TimeoutException;
59 import java.util.concurrent.atomic.AtomicInteger;
60
61 import org.eclipse.jgit.internal.JGitText;
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 public class IsolatedOutputStream extends OutputStream {
78 private final OutputStream dst;
79 private final ExecutorService copier;
80 private Future<Void> pending;
81
82
83
84
85
86
87
88 public IsolatedOutputStream(OutputStream out) {
89 dst = out;
90 copier = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
91 new ArrayBlockingQueue<Runnable>(1), new NamedThreadFactory());
92 }
93
94
95 @Override
96 public void write(int ch) throws IOException {
97 write(new byte[] { (byte) ch }, 0, 1);
98 }
99
100
101 @Override
102 public void write(byte[] buf, int pos, int cnt)
103 throws IOException {
104 checkClosed();
105 execute(new Callable<Void>() {
106 @Override
107 public Void call() throws IOException {
108 dst.write(buf, pos, cnt);
109 return null;
110 }
111 });
112 }
113
114
115 @Override
116 public void flush() throws IOException {
117 checkClosed();
118 execute(new Callable<Void>() {
119 @Override
120 public Void call() throws IOException {
121 dst.flush();
122 return null;
123 }
124 });
125 }
126
127
128 @Override
129 public void close() throws IOException {
130 if (!copier.isShutdown()) {
131 try {
132 if (pending == null || tryCleanClose()) {
133 cleanClose();
134 } else {
135 dirtyClose();
136 }
137 } finally {
138 copier.shutdown();
139 }
140 }
141 }
142
143 private boolean tryCleanClose() {
144
145
146
147
148
149 try {
150 pending.get(0, TimeUnit.MILLISECONDS);
151 pending = null;
152 return true;
153 } catch (TimeoutException | InterruptedException e) {
154 return false;
155 } catch (ExecutionException e) {
156 pending = null;
157 return true;
158 }
159 }
160
161 private void cleanClose() throws IOException {
162 execute(new Callable<Void>() {
163 @Override
164 public Void call() throws IOException {
165 dst.close();
166 return null;
167 }
168 });
169 }
170
171 private void dirtyClose() throws IOException {
172
173
174
175
176
177 pending.cancel(true);
178
179 Future<Void> close;
180 try {
181 close = copier.submit(new Callable<Void>() {
182 @Override
183 public Void call() throws IOException {
184 dst.close();
185 return null;
186 }
187 });
188 } catch (RejectedExecutionException e) {
189 throw new IOException(e);
190 }
191 try {
192 close.get(200, TimeUnit.MILLISECONDS);
193 } catch (InterruptedException | TimeoutException e) {
194 close.cancel(true);
195 throw new IOException(e);
196 } catch (ExecutionException e) {
197 throw new IOException(e.getCause());
198 }
199 }
200
201 private void checkClosed() throws IOException {
202 if (copier.isShutdown()) {
203 throw new IOException(JGitText.get().closed);
204 }
205 }
206
207 private void execute(Callable<Void> task) throws IOException {
208 if (pending != null) {
209
210 checkedGet(pending);
211 }
212 try {
213 pending = copier.submit(task);
214 } catch (RejectedExecutionException e) {
215 throw new IOException(e);
216 }
217 checkedGet(pending);
218 pending = null;
219 }
220
221 private static void checkedGet(Future<Void> future) throws IOException {
222 try {
223 future.get();
224 } catch (InterruptedException e) {
225 throw interrupted(e);
226 } catch (ExecutionException e) {
227 throw new IOException(e.getCause());
228 }
229 }
230
231 private static InterruptedIOException interrupted(InterruptedException c) {
232 InterruptedIOException e = new InterruptedIOException();
233 e.initCause(c);
234 return e;
235 }
236
237 private static class NamedThreadFactory implements ThreadFactory {
238 private static final AtomicInteger cnt = new AtomicInteger();
239
240 @Override
241 public Thread newThread(Runnable r) {
242 int n = cnt.incrementAndGet();
243 String name = IsolatedOutputStream.class.getSimpleName() + '-' + n;
244 return new Thread(r, name);
245 }
246 }
247 }