View Javadoc
1   /*
2    * Copyright (C) 2010, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.storage.pack;
45  
46  import java.io.IOException;
47  import java.util.ArrayList;
48  import java.util.Collections;
49  import java.util.Comparator;
50  import java.util.Iterator;
51  import java.util.LinkedList;
52  import java.util.List;
53  import java.util.concurrent.Callable;
54  
55  import org.eclipse.jgit.lib.ObjectReader;
56  import org.eclipse.jgit.lib.ThreadSafeProgressMonitor;
57  import org.eclipse.jgit.storage.pack.PackConfig;
58  
59  final class DeltaTask implements Callable<Object> {
60  	static final long MAX_METER = 9 << 20;
61  
62  	static final class Block {
63  		private static final int MIN_TOP_PATH = 50 << 20;
64  
65  		final List<DeltaTask> tasks;
66  		final int threads;
67  		final PackConfig config;
68  		final ObjectReader templateReader;
69  		final DeltaCache dc;
70  		final ThreadSafeProgressMonitor pm;
71  		final ObjectToPack[] list;
72  		final int beginIndex;
73  		final int endIndex;
74  
75  		private long totalWeight;
76  		private long bytesPerUnit;
77  
78  		Block(int threads, PackConfig config, ObjectReader reader,
79  				DeltaCache dc, ThreadSafeProgressMonitor pm,
80  				ObjectToPack[] list, int begin, int end) {
81  			this.tasks = new ArrayList<DeltaTask>(threads);
82  			this.threads = threads;
83  			this.config = config;
84  			this.templateReader = reader;
85  			this.dc = dc;
86  			this.pm = pm;
87  			this.list = list;
88  			this.beginIndex = begin;
89  			this.endIndex = end;
90  		}
91  
92  		int cost() {
93  			int d = (int) (totalWeight / bytesPerUnit);
94  			if (totalWeight % bytesPerUnit != 0)
95  				d++;
96  			return d;
97  		}
98  
99  		synchronized DeltaWindow stealWork(DeltaTask forThread) {
100 			for (;;) {
101 				DeltaTask maxTask = null;
102 				Slice maxSlice = null;
103 				int maxWork = 0;
104 
105 				for (DeltaTask task : tasks) {
106 					Slice s = task.remaining();
107 					if (s != null && maxWork < s.size()) {
108 						maxTask = task;
109 						maxSlice = s;
110 						maxWork = s.size();
111 					}
112 				}
113 				if (maxTask == null)
114 					return null;
115 				if (maxTask.tryStealWork(maxSlice))
116 					return forThread.initWindow(maxSlice);
117 			}
118 		}
119 
120 		void partitionTasks() {
121 			ArrayList<WeightedPath> topPaths = computeTopPaths();
122 			Iterator<WeightedPath> topPathItr = topPaths.iterator();
123 			int nextTop = 0;
124 			long weightPerThread = Math.max(totalWeight / threads, 1);
125 			for (int i = beginIndex; i < endIndex;) {
126 				DeltaTask task = new DeltaTask(this);
127 				long w = 0;
128 
129 				// Assign the thread one top path.
130 				if (topPathItr.hasNext()) {
131 					WeightedPath p = topPathItr.next();
132 					w += p.weight;
133 					task.add(p.slice);
134 				}
135 
136 				// Assign the task thread ~average weight.
137 				int s = i;
138 				for (; w < weightPerThread && i < endIndex;) {
139 					if (nextTop < topPaths.size()
140 							&& i == topPaths.get(nextTop).slice.beginIndex) {
141 						if (s < i)
142 							task.add(new Slice(s, i));
143 						s = i = topPaths.get(nextTop++).slice.endIndex;
144 					} else
145 						w += list[i++].getWeight();
146 				}
147 
148 				// Round up the slice to the end of a path.
149 				if (s < i) {
150 					int h = list[i - 1].getPathHash();
151 					while (i < endIndex) {
152 						if (h == list[i].getPathHash())
153 							i++;
154 						else
155 							break;
156 					}
157 					task.add(new Slice(s, i));
158 				}
159 				if (!task.slices.isEmpty())
160 					tasks.add(task);
161 			}
162 			while (topPathItr.hasNext()) {
163 				WeightedPath p = topPathItr.next();
164 				DeltaTask task = new DeltaTask(this);
165 				task.add(p.slice);
166 				tasks.add(task);
167 			}
168 
169 			topPaths = null;
170 		}
171 
172 		private ArrayList<WeightedPath> computeTopPaths() {
173 			ArrayList<WeightedPath> topPaths = new ArrayList<WeightedPath>(
174 					threads);
175 			int cp = beginIndex;
176 			int ch = list[cp].getPathHash();
177 			long cw = list[cp].getWeight();
178 			totalWeight = list[cp].getWeight();
179 
180 			for (int i = cp + 1; i < endIndex; i++) {
181 				ObjectToPack o = list[i];
182 				if (ch != o.getPathHash()) {
183 					if (MIN_TOP_PATH < cw) {
184 						if (topPaths.size() < threads) {
185 							Slice s = new Slice(cp, i);
186 							topPaths.add(new WeightedPath(cw, s));
187 							if (topPaths.size() == threads)
188 								Collections.sort(topPaths);
189 						} else if (topPaths.get(0).weight < cw) {
190 							Slice s = new Slice(cp, i);
191 							WeightedPath p = new WeightedPath(cw, s);
192 							topPaths.set(0, p);
193 							if (p.compareTo(topPaths.get(1)) > 0)
194 								Collections.sort(topPaths);
195 						}
196 					}
197 					cp = i;
198 					ch = o.getPathHash();
199 					cw = 0;
200 				}
201 				if (o.isEdge() || o.doNotAttemptDelta())
202 					continue;
203 				cw += o.getWeight();
204 				totalWeight += o.getWeight();
205 			}
206 
207 			// Sort by starting index to identify gaps later.
208 			Collections.sort(topPaths, new Comparator<WeightedPath>() {
209 				public int compare(WeightedPath a, WeightedPath b) {
210 					return a.slice.beginIndex - b.slice.beginIndex;
211 				}
212 			});
213 
214 			bytesPerUnit = 1;
215 			while (MAX_METER <= (totalWeight / bytesPerUnit))
216 				bytesPerUnit <<= 10;
217 			return topPaths;
218 		}
219 	}
220 
221 	static final class WeightedPath implements Comparable<WeightedPath> {
222 		final long weight;
223 		final Slice slice;
224 
225 		WeightedPath(long weight, Slice s) {
226 			this.weight = weight;
227 			this.slice = s;
228 		}
229 
230 		public int compareTo(WeightedPath o) {
231 			int cmp = Long.signum(weight - o.weight);
232 			if (cmp != 0)
233 				return cmp;
234 			return slice.beginIndex - o.slice.beginIndex;
235 		}
236 	}
237 
238 	static final class Slice {
239 		final int beginIndex;
240 		final int endIndex;
241 
242 		Slice(int b, int e) {
243 			beginIndex = b;
244 			endIndex = e;
245 		}
246 
247 		final int size() {
248 			return endIndex - beginIndex;
249 		}
250 	}
251 
252 	private final Block block;
253 	private final LinkedList<Slice> slices;
254 
255 	private ObjectReader or;
256 	private DeltaWindow dw;
257 
258 	DeltaTask(Block b) {
259 		this.block = b;
260 		this.slices = new LinkedList<Slice>();
261 	}
262 
263 	void add(Slice s) {
264 		if (!slices.isEmpty()) {
265 			Slice last = slices.getLast();
266 			if (last.endIndex == s.beginIndex) {
267 				slices.removeLast();
268 				slices.add(new Slice(last.beginIndex, s.endIndex));
269 				return;
270 			}
271 		}
272 		slices.add(s);
273 	}
274 
275 	public Object call() throws Exception {
276 		or = block.templateReader.newReader();
277 		try {
278 			DeltaWindow w;
279 			for (;;) {
280 				synchronized (this) {
281 					if (slices.isEmpty())
282 						break;
283 					w = initWindow(slices.removeFirst());
284 				}
285 				runWindow(w);
286 			}
287 			while ((w = block.stealWork(this)) != null)
288 				runWindow(w);
289 		} finally {
290 			block.pm.endWorker();
291 			or.close();
292 			or = null;
293 		}
294 		return null;
295 	}
296 
297 	DeltaWindow initWindow(Slice s) {
298 		DeltaWindow w = new DeltaWindow(block.config, block.dc,
299 				or, block.pm, block.bytesPerUnit,
300 				block.list, s.beginIndex, s.endIndex);
301 		synchronized (this) {
302 			dw = w;
303 		}
304 		return w;
305 	}
306 
307 	private void runWindow(DeltaWindow w) throws IOException {
308 		try {
309 			w.search();
310 		} finally {
311 			synchronized (this) {
312 				dw = null;
313 			}
314 		}
315 	}
316 
317 	synchronized Slice remaining() {
318 		if (!slices.isEmpty())
319 			return slices.getLast();
320 		DeltaWindow d = dw;
321 		return d != null ? d.remaining() : null;
322 	}
323 
324 	synchronized boolean tryStealWork(Slice s) {
325 		if (!slices.isEmpty() && slices.getLast().beginIndex == s.beginIndex) {
326 			slices.removeLast();
327 			return true;
328 		}
329 		DeltaWindow d = dw;
330 		return d != null ? d.tryStealWork(s) : false;
331 	}
332 }