1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.pack;
45
46 import java.io.IOException;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.Comparator;
50 import java.util.Iterator;
51 import java.util.LinkedList;
52 import java.util.List;
53 import java.util.concurrent.Callable;
54
55 import org.eclipse.jgit.lib.ObjectReader;
56 import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
57 import org.eclipse.jgit.storage.pack.PackConfig;
58
59 final class DeltaTask implements Callable<Object> {
60 static final long MAX_METER = 9 << 20;
61
62 static final class Block {
63 private static final int MIN_TOP_PATH = 50 << 20;
64
65 final List<DeltaTask> tasks;
66 final int threads;
67 final PackConfig config;
68 final ObjectReader templateReader;
69 final DeltaCache dc;
70 final ThreadSafeProgressMonitor pm;
71 final ObjectToPack[] list;
72 final int beginIndex;
73 final int endIndex;
74
75 private long totalWeight;
76 private long bytesPerUnit;
77
78 Block(int threads, PackConfig config, ObjectReader reader,
79 DeltaCache dc, ThreadSafeProgressMonitor pm,
80 ObjectToPack[] list, int begin, int end) {
81 this.tasks = new ArrayList<DeltaTask>(threads);
82 this.threads = threads;
83 this.config = config;
84 this.templateReader = reader;
85 this.dc = dc;
86 this.pm = pm;
87 this.list = list;
88 this.beginIndex = begin;
89 this.endIndex = end;
90 }
91
92 int cost() {
93 int d = (int) (totalWeight / bytesPerUnit);
94 if (totalWeight % bytesPerUnit != 0)
95 d++;
96 return d;
97 }
98
99 synchronized DeltaWindow stealWork(DeltaTask forThread) {
100 for (;;) {
101 DeltaTask maxTask = null;
102 Slice maxSlice = null;
103 int maxWork = 0;
104
105 for (DeltaTask task : tasks) {
106 Slice s = task.remaining();
107 if (s != null && maxWork < s.size()) {
108 maxTask = task;
109 maxSlice = s;
110 maxWork = s.size();
111 }
112 }
113 if (maxTask == null)
114 return null;
115 if (maxTask.tryStealWork(maxSlice))
116 return forThread.initWindow(maxSlice);
117 }
118 }
119
120 void partitionTasks() {
121 ArrayList<WeightedPath> topPaths = computeTopPaths();
122 Iterator<WeightedPath> topPathItr = topPaths.iterator();
123 int nextTop = 0;
124 long weightPerThread = Math.max(totalWeight / threads, 1);
125 for (int i = beginIndex; i < endIndex;) {
126 DeltaTask task = new DeltaTask(this);
127 long w = 0;
128
129
130 if (topPathItr.hasNext()) {
131 WeightedPath p = topPathItr.next();
132 w += p.weight;
133 task.add(p.slice);
134 }
135
136
137 int s = i;
138 for (; w < weightPerThread && i < endIndex;) {
139 if (nextTop < topPaths.size()
140 && i == topPaths.get(nextTop).slice.beginIndex) {
141 if (s < i)
142 task.add(new Slice(s, i));
143 s = i = topPaths.get(nextTop++).slice.endIndex;
144 } else
145 w += list[i++].getWeight();
146 }
147
148
149 if (s < i) {
150 int h = list[i - 1].getPathHash();
151 while (i < endIndex) {
152 if (h == list[i].getPathHash())
153 i++;
154 else
155 break;
156 }
157 task.add(new Slice(s, i));
158 }
159 if (!task.slices.isEmpty())
160 tasks.add(task);
161 }
162 while (topPathItr.hasNext()) {
163 WeightedPath p = topPathItr.next();
164 DeltaTask task = new DeltaTask(this);
165 task.add(p.slice);
166 tasks.add(task);
167 }
168
169 topPaths = null;
170 }
171
172 private ArrayList<WeightedPath> computeTopPaths() {
173 ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>(
174 threads);
175 int cp = beginIndex;
176 int ch = list[cp].getPathHash();
177 long cw = list[cp].getWeight();
178 totalWeight = list[cp].getWeight();
179
180 for (int i = cp + 1; i < endIndex; i++) {
181 ObjectToPack o = list[i];
182 if (ch != o.getPathHash()) {
183 if (MIN_TOP_PATH < cw) {
184 if (topPaths.size() < threads) {
185 Slice s = new Slice(cp, i);
186 topPaths.add(new WeightedPath(cw, s));
187 if (topPaths.size() == threads)
188 Collections.sort(topPaths);
189 } else if (topPaths.get(0).weight < cw) {
190 Slice s = new Slice(cp, i);
191 WeightedPath p = new WeightedPath(cw, s);
192 topPaths.set(0, p);
193 if (p.compareTo(topPaths.get(1)) > 0)
194 Collections.sort(topPaths);
195 }
196 }
197 cp = i;
198 ch = o.getPathHash();
199 cw = 0;
200 }
201 if (o.isEdge() || o.doNotAttemptDelta())
202 continue;
203 cw += o.getWeight();
204 totalWeight += o.getWeight();
205 }
206
207
208 Collections.sort(topPaths, new Comparator<WeightedPath>() {
209 public int compare(WeightedPath a, WeightedPath b) {
210 return a.slice.beginIndex - b.slice.beginIndex;
211 }
212 });
213
214 bytesPerUnit = 1;
215 while (MAX_METER <= (totalWeight / bytesPerUnit))
216 bytesPerUnit <<= 10;
217 return topPaths;
218 }
219 }
220
221 static final class WeightedPath implements Comparable<WeightedPath> {
222 final long weight;
223 final Slice slice;
224
225 WeightedPath(long weight, Slice s) {
226 this.weight = weight;
227 this.slice = s;
228 }
229
230 public int compareTo(WeightedPath o) {
231 int cmp = Long.signum(weight - o.weight);
232 if (cmp != 0)
233 return cmp;
234 return slice.beginIndex - o.slice.beginIndex;
235 }
236 }
237
238 static final class Slice {
239 final int beginIndex;
240 final int endIndex;
241
242 Slice(int b, int e) {
243 beginIndex = b;
244 endIndex = e;
245 }
246
247 final int size() {
248 return endIndex - beginIndex;
249 }
250 }
251
252 private final Block block;
253 private final LinkedList<Slice> slices;
254
255 private ObjectReader or;
256 private DeltaWindow dw;
257
258 DeltaTask(Block b) {
259 this.block = b;
260 this.slices = new LinkedList<Slice>();
261 }
262
263 void add(Slice s) {
264 if (!slices.isEmpty()) {
265 Slice last = slices.getLast();
266 if (last.endIndex == s.beginIndex) {
267 slices.removeLast();
268 slices.add(new Slice(last.beginIndex, s.endIndex));
269 return;
270 }
271 }
272 slices.add(s);
273 }
274
275 public Object call() throws Exception {
276 or = block.templateReader.newReader();
277 try {
278 DeltaWindow w;
279 for (;;) {
280 synchronized (this) {
281 if (slices.isEmpty())
282 break;
283 w = initWindow(slices.removeFirst());
284 }
285 runWindow(w);
286 }
287 while ((w = block.stealWork(this)) != null)
288 runWindow(w);
289 } finally {
290 block.pm.endWorker();
291 or.close();
292 or = null;
293 }
294 return null;
295 }
296
297 DeltaWindow initWindow(Slice s) {
298 DeltaWindow w = new DeltaWindow(block.config, block.dc,
299 or, block.pm, block.bytesPerUnit,
300 block.list, s.beginIndex, s.endIndex);
301 synchronized (this) {
302 dw = w;
303 }
304 return w;
305 }
306
307 private void runWindow(DeltaWindow w) throws IOException {
308 try {
309 w.search();
310 } finally {
311 synchronized (this) {
312 dw = null;
313 }
314 }
315 }
316
317 synchronized Slice remaining() {
318 if (!slices.isEmpty())
319 return slices.getLast();
320 DeltaWindow d = dw;
321 return d != null ? d.remaining() : null;
322 }
323
324 synchronized boolean tryStealWork(Slice s) {
325 if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
326 slices.removeLast();
327 return true;
328 }
329 DeltaWindow d = dw;
330 return d != null ? d.tryStealWork(s) : false;
331 }
332 }