1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.storage.pack;
45
46 import java.io.IOException;
47 import java.util.ArrayList;
48 import java.util.Collections;
49 import java.util.Iterator;
50 import java.util.LinkedList;
51 import java.util.List;
52 import java.util.concurrent.Callable;
53
54 import org.eclipse.jgit.lib.ObjectReader;
55 import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
56 import org.eclipse.jgit.storage.pack.PackConfig;
57
58 final class DeltaTask implements Callable<Object> {
59 static final long MAX_METER = 9 << 20;
60
61 static final class Block {
62 private static final int MIN_TOP_PATH = 50 << 20;
63
64 final List<DeltaTask> tasks;
65 final int threads;
66 final PackConfig config;
67 final ObjectReader templateReader;
68 final DeltaCache dc;
69 final ThreadSafeProgressMonitor pm;
70 final ObjectToPack[] list;
71 final int beginIndex;
72 final int endIndex;
73
74 private long totalWeight;
75 long bytesPerUnit;
76
77 Block(int threads, PackConfig config, ObjectReader reader,
78 DeltaCache dc, ThreadSafeProgressMonitor pm,
79 ObjectToPack[] list, int begin, int end) {
80 this.tasks = new ArrayList<>(threads);
81 this.threads = threads;
82 this.config = config;
83 this.templateReader = reader;
84 this.dc = dc;
85 this.pm = pm;
86 this.list = list;
87 this.beginIndex = begin;
88 this.endIndex = end;
89 }
90
91 int cost() {
92 int d = (int) (totalWeight / bytesPerUnit);
93 if (totalWeight % bytesPerUnit != 0)
94 d++;
95 return d;
96 }
97
98 synchronized DeltaWindow stealWork(DeltaTask forThread) {
99 for (;;) {
100 DeltaTask maxTask = null;
101 Slice maxSlice = null;
102 int maxWork = 0;
103
104 for (DeltaTask task : tasks) {
105 Slice s = task.remaining();
106 if (s != null && maxWork < s.size()) {
107 maxTask = task;
108 maxSlice = s;
109 maxWork = s.size();
110 }
111 }
112 if (maxTask == null) {
113 return null;
114 }
115 if (maxTask.tryStealWork(maxSlice)) {
116 return forThread.initWindow(maxSlice);
117 }
118 }
119 }
120
121 void partitionTasks() {
122 ArrayList<WeightedPath> topPaths = computeTopPaths();
123 Iterator<WeightedPath> topPathItr = topPaths.iterator();
124 int nextTop = 0;
125 long weightPerThread = Math.max(totalWeight / threads, 1);
126 for (int i = beginIndex; i < endIndex;) {
127 DeltaTask task = new DeltaTask(this);
128 long w = 0;
129
130
131 if (topPathItr.hasNext()) {
132 WeightedPath p = topPathItr.next();
133 w += p.weight;
134 task.add(p.slice);
135 }
136
137
138 int s = i;
139 for (; w < weightPerThread && i < endIndex;) {
140 if (nextTop < topPaths.size()
141 && i == topPaths.get(nextTop).slice.beginIndex) {
142 if (s < i) {
143 task.add(new Slice(s, i));
144 }
145 s = i = topPaths.get(nextTop++).slice.endIndex;
146 } else {
147 w += getAdjustedWeight(list[i++]);
148 }
149 }
150
151
152 if (s < i) {
153 int h = list[i - 1].getPathHash();
154 while (i < endIndex) {
155 if (h == list[i].getPathHash()) {
156 i++;
157 } else {
158 break;
159 }
160 }
161 task.add(new Slice(s, i));
162 }
163 if (!task.slices.isEmpty()) {
164 tasks.add(task);
165 }
166 }
167 while (topPathItr.hasNext()) {
168 WeightedPath p = topPathItr.next();
169 DeltaTask task = new DeltaTask(this);
170 task.add(p.slice);
171 tasks.add(task);
172 }
173
174 topPaths = null;
175 }
176
177 private ArrayList<WeightedPath> computeTopPaths() {
178 ArrayList<WeightedPath> topPaths = new ArrayList<>(
179 threads);
180 int cp = beginIndex;
181 int ch = list[cp].getPathHash();
182 long cw = getAdjustedWeight(list[cp]);
183 totalWeight = cw;
184
185 for (int i = cp + 1; i < endIndex; i++) {
186 ObjectToPack o = list[i];
187 if (ch != o.getPathHash()) {
188 if (MIN_TOP_PATH < cw) {
189 if (topPaths.size() < threads) {
190 Slice s = new Slice(cp, i);
191 topPaths.add(new WeightedPath(cw, s));
192 if (topPaths.size() == threads) {
193 Collections.sort(topPaths);
194 }
195 } else if (topPaths.get(0).weight < cw) {
196 Slice s = new Slice(cp, i);
197 WeightedPath p = new WeightedPath(cw, s);
198 topPaths.set(0, p);
199 if (p.compareTo(topPaths.get(1)) > 0) {
200 Collections.sort(topPaths);
201 }
202 }
203 }
204 cp = i;
205 ch = o.getPathHash();
206 cw = 0;
207 }
208 int weight = getAdjustedWeight(o);
209 cw += weight;
210 totalWeight += weight;
211 }
212
213
214 Collections.sort(topPaths, (WeightedPath a,
215 WeightedPath b) -> a.slice.beginIndex - b.slice.beginIndex);
216
217 bytesPerUnit = 1;
218 while (MAX_METER <= (totalWeight / bytesPerUnit)) {
219 bytesPerUnit <<= 10;
220 }
221 return topPaths;
222 }
223 }
224
225 static int getAdjustedWeight(ObjectToPack o) {
226
227
228 if (o.isEdge() || o.doNotAttemptDelta()) {
229 return 0;
230 }
231 return o.getWeight();
232 }
233
234 static final class WeightedPath implements Comparable<WeightedPath> {
235 final long weight;
236 final Slice slice;
237
238 WeightedPath(long weight, Slice s) {
239 this.weight = weight;
240 this.slice = s;
241 }
242
243 @Override
244 public int compareTo(WeightedPath o) {
245 int cmp = Long.signum(weight - o.weight);
246 if (cmp != 0) {
247 return cmp;
248 }
249 return slice.beginIndex - o.slice.beginIndex;
250 }
251 }
252
253 static final class Slice {
254 final int beginIndex;
255 final int endIndex;
256
257 Slice(int b, int e) {
258 beginIndex = b;
259 endIndex = e;
260 }
261
262 final int size() {
263 return endIndex - beginIndex;
264 }
265 }
266
267 private final Block block;
268 final LinkedList<Slice> slices;
269
270 private ObjectReader or;
271 private DeltaWindow dw;
272
273 DeltaTask(Block b) {
274 this.block = b;
275 this.slices = new LinkedList<>();
276 }
277
278 void add(Slice s) {
279 if (!slices.isEmpty()) {
280 Slice last = slices.getLast();
281 if (last.endIndex == s.beginIndex) {
282 slices.removeLast();
283 slices.add(new Slice(last.beginIndex, s.endIndex));
284 return;
285 }
286 }
287 slices.add(s);
288 }
289
290
291 @Override
292 public Object call() throws Exception {
293 or = block.templateReader.newReader();
294 try {
295 DeltaWindow w;
296 for (;;) {
297 synchronized (this) {
298 if (slices.isEmpty()) {
299 break;
300 }
301 w = initWindow(slices.removeFirst());
302 }
303 runWindow(w);
304 }
305 while ((w = block.stealWork(this)) != null) {
306 runWindow(w);
307 }
308 } finally {
309 block.pm.endWorker();
310 or.close();
311 or = null;
312 }
313 return null;
314 }
315
316 DeltaWindow initWindow(Slice s) {
317 DeltaWindow w = new DeltaWindow(block.config, block.dc,
318 or, block.pm, block.bytesPerUnit,
319 block.list, s.beginIndex, s.endIndex);
320 synchronized (this) {
321 dw = w;
322 }
323 return w;
324 }
325
326 private void runWindow(DeltaWindow w) throws IOException {
327 try {
328 w.search();
329 } finally {
330 synchronized (this) {
331 dw = null;
332 }
333 }
334 }
335
336 synchronized Slice remaining() {
337 if (!slices.isEmpty()) {
338 return slices.getLast();
339 }
340 DeltaWindow d = dw;
341 return d != null ? d.remaining() : null;
342 }
343
344 synchronized boolean tryStealWork(Slice s) {
345 if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
346 slices.removeLast();
347 return true;
348 }
349 DeltaWindow d = dw;
350 return d != null ? d.tryStealWork(s) : false;
351 }
352 }