View Javadoc
1   /*
2    * Copyright (C) 2010, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.storage.pack;
12  
13  import java.io.IOException;
14  import java.util.ArrayList;
15  import java.util.Collections;
16  import java.util.Iterator;
17  import java.util.LinkedList;
18  import java.util.List;
19  import java.util.concurrent.Callable;
20  
21  import org.eclipse.jgit.lib.ObjectReader;
22  import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
23  import org.eclipse.jgit.storage.pack.PackConfig;
24  
25  final class DeltaTask implements Callable<Object> {
26  	static final long MAX_METER = 9 << 20;
27  
28  	static final class Block {
29  		private static final int MIN_TOP_PATH = 50 << 20;
30  
31  		final List<DeltaTask> tasks;
32  		final int threads;
33  		final PackConfig config;
34  		final ObjectReader templateReader;
35  		final DeltaCache dc;
36  		final ThreadSafeProgressMonitor pm;
37  		final ObjectToPack[] list;
38  		final int beginIndex;
39  		final int endIndex;
40  
41  		private long totalWeight;
42  		long bytesPerUnit;
43  
44  		Block(int threads, PackConfig config, ObjectReader reader,
45  				DeltaCache dc, ThreadSafeProgressMonitor pm,
46  				ObjectToPack[] list, int begin, int end) {
47  			this.tasks = new ArrayList<>(threads);
48  			this.threads = threads;
49  			this.config = config;
50  			this.templateReader = reader;
51  			this.dc = dc;
52  			this.pm = pm;
53  			this.list = list;
54  			this.beginIndex = begin;
55  			this.endIndex = end;
56  		}
57  
58  		int cost() {
59  			int d = (int) (totalWeight / bytesPerUnit);
60  			if (totalWeight % bytesPerUnit != 0)
61  				d++;
62  			return d;
63  		}
64  
65  		synchronized DeltaWindow stealWork(DeltaTask forThread) {
66  			for (;;) {
67  				DeltaTask maxTask = null;
68  				Slice maxSlice = null;
69  				int maxWork = 0;
70  
71  				for (DeltaTask task : tasks) {
72  					Slice s = task.remaining();
73  					if (s != null && maxWork < s.size()) {
74  						maxTask = task;
75  						maxSlice = s;
76  						maxWork = s.size();
77  					}
78  				}
79  				if (maxTask == null) {
80  					return null;
81  				}
82  				if (maxTask.tryStealWork(maxSlice)) {
83  					return forThread.initWindow(maxSlice);
84  				}
85  			}
86  		}
87  
88  		void partitionTasks() {
89  			ArrayList<WeightedPath> topPaths = computeTopPaths();
90  			Iterator<WeightedPath> topPathItr = topPaths.iterator();
91  			int nextTop = 0;
92  			long weightPerThread = Math.max(totalWeight / threads, 1);
93  			for (int i = beginIndex; i < endIndex;) {
94  				DeltaTask task = new DeltaTask(this);
95  				long w = 0;
96  
97  				// Assign the thread one top path.
98  				if (topPathItr.hasNext()) {
99  					WeightedPath p = topPathItr.next();
100 					w += p.weight;
101 					task.add(p.slice);
102 				}
103 
104 				// Assign the task thread ~average weight.
105 				int s = i;
106 				for (; w < weightPerThread && i < endIndex;) {
107 					if (nextTop < topPaths.size()
108 							&& i == topPaths.get(nextTop).slice.beginIndex) {
109 						if (s < i) {
110 							task.add(new Slice(s, i));
111 						}
112 						s = i = topPaths.get(nextTop++).slice.endIndex;
113 					} else {
114 						w += getAdjustedWeight(list[i++]);
115 					}
116 				}
117 
118 				// Round up the slice to the end of a path.
119 				if (s < i) {
120 					int h = list[i - 1].getPathHash();
121 					while (i < endIndex) {
122 						if (h == list[i].getPathHash()) {
123 							i++;
124 						} else {
125 							break;
126 						}
127 					}
128 					task.add(new Slice(s, i));
129 				}
130 				if (!task.slices.isEmpty()) {
131 					tasks.add(task);
132 				}
133 			}
134 			while (topPathItr.hasNext()) {
135 				WeightedPath p = topPathItr.next();
136 				DeltaTask task = new DeltaTask(this);
137 				task.add(p.slice);
138 				tasks.add(task);
139 			}
140 
141 			topPaths = null;
142 		}
143 
144 		private ArrayList<WeightedPath> computeTopPaths() {
145 			ArrayList<WeightedPath> topPaths = new ArrayList<>(
146 					threads);
147 			int cp = beginIndex;
148 			int ch = list[cp].getPathHash();
149 			long cw = getAdjustedWeight(list[cp]);
150 			totalWeight = cw;
151 
152 			for (int i = cp + 1; i < endIndex; i++) {
153 				ObjectToPack o = list[i];
154 				if (ch != o.getPathHash()) {
155 					if (MIN_TOP_PATH < cw) {
156 						if (topPaths.size() < threads) {
157 							Slice s = new Slice(cp, i);
158 							topPaths.add(new WeightedPath(cw, s));
159 							if (topPaths.size() == threads) {
160 								Collections.sort(topPaths);
161 							}
162 						} else if (topPaths.get(0).weight < cw) {
163 							Slice s = new Slice(cp, i);
164 							WeightedPath p = new WeightedPath(cw, s);
165 							topPaths.set(0, p);
166 							if (p.compareTo(topPaths.get(1)) > 0) {
167 								Collections.sort(topPaths);
168 							}
169 						}
170 					}
171 					cp = i;
172 					ch = o.getPathHash();
173 					cw = 0;
174 				}
175 				int weight = getAdjustedWeight(o);
176 				cw += weight;
177 				totalWeight += weight;
178 			}
179 
180 			// Sort by starting index to identify gaps later.
181 			Collections.sort(topPaths, (WeightedPath a,
182 					WeightedPath b) -> a.slice.beginIndex - b.slice.beginIndex);
183 
184 			bytesPerUnit = 1;
185 			while (MAX_METER <= (totalWeight / bytesPerUnit)) {
186 				bytesPerUnit <<= 10;
187 			}
188 			return topPaths;
189 		}
190 	}
191 
192 	static int getAdjustedWeight(ObjectToPack o) {
193 		// Edge objects and those with reused deltas do not need to be
194 		// compressed. For compression calculations, ignore their weights.
195 		if (o.isEdge() || o.doNotAttemptDelta()) {
196 			return 0;
197 		}
198 		return o.getWeight();
199 	}
200 
201 	static final class WeightedPath implements Comparable<WeightedPath> {
202 		final long weight;
203 		final Slice slice;
204 
205 		WeightedPath(long weight, Slice s) {
206 			this.weight = weight;
207 			this.slice = s;
208 		}
209 
210 		@Override
211 		public int compareTo(WeightedPath o) {
212 			int cmp = Long.signum(weight - o.weight);
213 			if (cmp != 0) {
214 				return cmp;
215 			}
216 			return slice.beginIndex - o.slice.beginIndex;
217 		}
218 	}
219 
220 	static final class Slice {
221 		final int beginIndex;
222 		final int endIndex;
223 
224 		Slice(int b, int e) {
225 			beginIndex = b;
226 			endIndex = e;
227 		}
228 
229 		final int size() {
230 			return endIndex - beginIndex;
231 		}
232 	}
233 
234 	private final Block block;
235 	final LinkedList<Slice> slices;
236 
237 	private ObjectReader or;
238 	private DeltaWindow dw;
239 
240 	DeltaTask(Block b) {
241 		this.block = b;
242 		this.slices = new LinkedList<>();
243 	}
244 
245 	void add(Slice s) {
246 		if (!slices.isEmpty()) {
247 			Slice last = slices.getLast();
248 			if (last.endIndex == s.beginIndex) {
249 				slices.removeLast();
250 				slices.add(new Slice(last.beginIndex, s.endIndex));
251 				return;
252 			}
253 		}
254 		slices.add(s);
255 	}
256 
257 	/** {@inheritDoc} */
258 	@Override
259 	public Object call() throws Exception {
260 		or = block.templateReader.newReader();
261 		try {
262 			DeltaWindow w;
263 			for (;;) {
264 				synchronized (this) {
265 					if (slices.isEmpty()) {
266 						break;
267 					}
268 					w = initWindow(slices.removeFirst());
269 				}
270 				runWindow(w);
271 			}
272 			while ((w = block.stealWork(this)) != null) {
273 				runWindow(w);
274 			}
275 		} finally {
276 			block.pm.endWorker();
277 			or.close();
278 			or = null;
279 		}
280 		return null;
281 	}
282 
283 	DeltaWindow initWindow(Slice s) {
284 		DeltaWindow w = new DeltaWindow(block.config, block.dc,
285 				or, block.pm, block.bytesPerUnit,
286 				block.list, s.beginIndex, s.endIndex);
287 		synchronized (this) {
288 			dw = w;
289 		}
290 		return w;
291 	}
292 
293 	private void runWindow(DeltaWindow w) throws IOException {
294 		try {
295 			w.search();
296 		} finally {
297 			synchronized (this) {
298 				dw = null;
299 			}
300 		}
301 	}
302 
303 	synchronized Slice remaining() {
304 		if (!slices.isEmpty()) {
305 			return slices.getLast();
306 		}
307 		DeltaWindow d = dw;
308 		return d != null ? d.remaining() : null;
309 	}
310 
311 	synchronized boolean tryStealWork(Slice s) {
312 		if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
313 			slices.removeLast();
314 			return true;
315 		}
316 		DeltaWindow d = dw;
317 		return d != null ? d.tryStealWork(s) : false;
318 	}
319 }