View Javadoc
1   /*
2    * Copyright (C) 2008-2011, Google Inc.
3    * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org>
4    * and other copyright owners as documented in the project's IP log.
5    *
6    * This program and the accompanying materials are made available
7    * under the terms of the Eclipse Distribution License v1.0 which
8    * accompanies this distribution, is reproduced below, and is
9    * available at http://www.eclipse.org/org/documents/edl-v10.php
10   *
11   * All rights reserved.
12   *
13   * Redistribution and use in source and binary forms, with or
14   * without modification, are permitted provided that the following
15   * conditions are met:
16   *
17   * - Redistributions of source code must retain the above copyright
18   *   notice, this list of conditions and the following disclaimer.
19   *
20   * - Redistributions in binary form must reproduce the above
21   *   copyright notice, this list of conditions and the following
22   *   disclaimer in the documentation and/or other materials provided
23   *   with the distribution.
24   *
25   * - Neither the name of the Eclipse Foundation, Inc. nor the
26   *   names of its contributors may be used to endorse or promote
27   *   products derived from this software without specific prior
28   *   written permission.
29   *
30   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
31   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
32   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
33   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
34   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
35   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
36   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
37   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
38   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
39   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
40   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
41   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
42   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43   */
44  
45  package org.eclipse.jgit.internal.storage.dfs;
46  
47  import java.io.IOException;
48  import java.util.concurrent.atomic.AtomicLong;
49  import java.util.concurrent.atomic.AtomicReference;
50  import java.util.concurrent.atomic.AtomicReferenceArray;
51  import java.util.concurrent.locks.ReentrantLock;
52  import java.util.stream.LongStream;
53  
54  import org.eclipse.jgit.annotations.Nullable;
55  import org.eclipse.jgit.internal.JGitText;
56  import org.eclipse.jgit.internal.storage.pack.PackExt;
57  
58  /**
59   * Caches slices of a
60   * {@link org.eclipse.jgit.internal.storage.dfs.BlockBasedFile} in memory for
61   * faster read access.
62   * <p>
63   * The DfsBlockCache serves as a Java based "buffer cache", loading segments of
64   * a BlockBasedFile into the JVM heap prior to use. As JGit often wants to do
65   * reads of only tiny slices of a file, the DfsBlockCache tries to smooth out
66   * these tiny reads into larger block-sized IO operations.
67   * <p>
68   * Whenever a cache miss occurs, loading is invoked by exactly one thread for
69   * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by
70   * an array of locks, with the tuple hashed to a lock instance.
71   * <p>
72   * Its too expensive during object access to be accurate with a least recently
73   * used (LRU) algorithm. Strictly ordering every read is a lot of overhead that
74   * typically doesn't yield a corresponding benefit to the application. This
75   * cache implements a clock replacement algorithm, giving each block one chance
76   * to have been accessed during a sweep of the cache to save itself from
77   * eviction.
78   * <p>
79   * Entities created by the cache are held under hard references, preventing the
80   * Java VM from clearing anything. Blocks are discarded by the replacement
81   * algorithm when adding a new block would cause the cache to exceed its
82   * configured maximum size.
83   * <p>
84   * The key tuple is passed through to methods as a pair of parameters rather
85   * than as a single Object, thus reducing the transient memory allocations of
86   * callers. It is more efficient to avoid the allocation, as we can't be 100%
87   * sure that a JIT would be able to stack-allocate a key tuple.
88   * <p>
89   * The internal hash table does not expand at runtime, instead it is fixed in
90   * size at cache creation time. The internal lock table used to gate load
91   * invocations is also fixed in size.
92   */
93  public final class DfsBlockCache {
94  	private static volatile DfsBlockCache cache;
95  
96  	static {
97  		reconfigure(new DfsBlockCacheConfig());
98  	}
99  
100 	/**
101 	 * Modify the configuration of the window cache.
102 	 * <p>
103 	 * The new configuration is applied immediately, and the existing cache is
104 	 * cleared.
105 	 *
106 	 * @param cfg
107 	 *            the new window cache configuration.
108 	 * @throws java.lang.IllegalArgumentException
109 	 *             the cache configuration contains one or more invalid
110 	 *             settings, usually too low of a limit.
111 	 */
112 	public static void reconfigure(DfsBlockCacheConfig cfg) {
113 		cache = new DfsBlockCache(cfg);
114 	}
115 
116 	/**
117 	 * Get the currently active DfsBlockCache.
118 	 *
119 	 * @return the currently active DfsBlockCache.
120 	 */
121 	public static DfsBlockCache getInstance() {
122 		return cache;
123 	}
124 
125 	/** Number of entries in {@link #table}. */
126 	private final int tableSize;
127 
128 	/** Hash bucket directory; entries are chained below. */
129 	private final AtomicReferenceArray<HashEntry> table;
130 
131 	/** Locks to prevent concurrent loads for same (PackFile,position). */
132 	private final ReentrantLock[] loadLocks;
133 
134 	/** Maximum number of bytes the cache should hold. */
135 	private final long maxBytes;
136 
137 	/** Pack files smaller than this size can be copied through the cache. */
138 	private final long maxStreamThroughCache;
139 
140 	/**
141 	 * Suggested block size to read from pack files in.
142 	 * <p>
143 	 * If a pack file does not have a native block size, this size will be used.
144 	 * <p>
145 	 * If a pack file has a native size, a whole multiple of the native size
146 	 * will be used until it matches this size.
147 	 * <p>
148 	 * The value for blockSize must be a power of 2.
149 	 */
150 	private final int blockSize;
151 
152 	/** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */
153 	private final int blockSizeShift;
154 
155 	/**
156 	 * Number of times a block was found in the cache, per pack file extension.
157 	 */
158 	private final AtomicReference<AtomicLong[]> statHit;
159 
160 	/**
161 	 * Number of times a block was not found, and had to be loaded, per pack
162 	 * file extension.
163 	 */
164 	private final AtomicReference<AtomicLong[]> statMiss;
165 
166 	/**
167 	 * Number of blocks evicted due to cache being full, per pack file
168 	 * extension.
169 	 */
170 	private final AtomicReference<AtomicLong[]> statEvict;
171 
172 	/**
173 	 * Number of bytes currently loaded in the cache, per pack file extension.
174 	 */
175 	private final AtomicReference<AtomicLong[]> liveBytes;
176 
177 	/** Protects the clock and its related data. */
178 	private final ReentrantLock clockLock;
179 
180 	/** Current position of the clock. */
181 	private Ref clockHand;
182 
183 	@SuppressWarnings("unchecked")
184 	private DfsBlockCache(DfsBlockCacheConfig cfg) {
185 		tableSize = tableSize(cfg);
186 		if (tableSize < 1)
187 			throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
188 
189 		table = new AtomicReferenceArray<>(tableSize);
190 		loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
191 		for (int i = 0; i < loadLocks.length; i++)
192 			loadLocks[i] = new ReentrantLock(true /* fair */);
193 
194 		maxBytes = cfg.getBlockLimit();
195 		maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
196 		blockSize = cfg.getBlockSize();
197 		blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
198 
199 		clockLock = new ReentrantLock(true /* fair */);
200 		String none = ""; //$NON-NLS-1$
201 		clockHand = new Ref<>(
202 				DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
203 				-1, 0, null);
204 		clockHand.next = clockHand;
205 
206 		statHit = new AtomicReference<>(newCounters());
207 		statMiss = new AtomicReference<>(newCounters());
208 		statEvict = new AtomicReference<>(newCounters());
209 		liveBytes = new AtomicReference<>(newCounters());
210 	}
211 
212 	boolean shouldCopyThroughCache(long length) {
213 		return length <= maxStreamThroughCache;
214 	}
215 
216 	/**
217 	 * Get total number of bytes in the cache, per pack file extension.
218 	 *
219 	 * @return total number of bytes in the cache, per pack file extension.
220 	 */
221 	public long[] getCurrentSize() {
222 		return getStatVals(liveBytes);
223 	}
224 
225 	/**
226 	 * Get 0..100, defining how full the cache is.
227 	 *
228 	 * @return 0..100, defining how full the cache is.
229 	 */
230 	public long getFillPercentage() {
231 		return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
232 	}
233 
234 	/**
235 	 * Get number of requests for items in the cache, per pack file extension.
236 	 *
237 	 * @return number of requests for items in the cache, per pack file
238 	 *         extension.
239 	 */
240 	public long[] getHitCount() {
241 		return getStatVals(statHit);
242 	}
243 
244 	/**
245 	 * Get number of requests for items not in the cache, per pack file
246 	 * extension.
247 	 *
248 	 * @return number of requests for items not in the cache, per pack file
249 	 *         extension.
250 	 */
251 	public long[] getMissCount() {
252 		return getStatVals(statMiss);
253 	}
254 
255 	/**
256 	 * Get total number of requests (hit + miss), per pack file extension.
257 	 *
258 	 * @return total number of requests (hit + miss), per pack file extension.
259 	 */
260 	public long[] getTotalRequestCount() {
261 		AtomicLong[] hit = statHit.get();
262 		AtomicLong[] miss = statMiss.get();
263 		long[] cnt = new long[Math.max(hit.length, miss.length)];
264 		for (int i = 0; i < hit.length; i++) {
265 			cnt[i] += hit[i].get();
266 		}
267 		for (int i = 0; i < miss.length; i++) {
268 			cnt[i] += miss[i].get();
269 		}
270 		return cnt;
271 	}
272 
273 	/**
274 	 * Get hit ratios
275 	 *
276 	 * @return hit ratios
277 	 */
278 	public long[] getHitRatio() {
279 		AtomicLong[] hit = statHit.get();
280 		AtomicLong[] miss = statMiss.get();
281 		long[] ratio = new long[Math.max(hit.length, miss.length)];
282 		for (int i = 0; i < ratio.length; i++) {
283 			if (i >= hit.length) {
284 				ratio[i] = 0;
285 			} else if (i >= miss.length) {
286 				ratio[i] = 100;
287 			} else {
288 				long hitVal = hit[i].get();
289 				long missVal = miss[i].get();
290 				long total = hitVal + missVal;
291 				ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
292 			}
293 		}
294 		return ratio;
295 	}
296 
297 	/**
298 	 * Get number of evictions performed due to cache being full, per pack file
299 	 * extension.
300 	 *
301 	 * @return number of evictions performed due to cache being full, per pack
302 	 *         file extension.
303 	 */
304 	public long[] getEvictions() {
305 		return getStatVals(statEvict);
306 	}
307 
308 	/**
309 	 * Quickly check if the cache contains block 0 of the given stream.
310 	 * <p>
311 	 * This can be useful for sophisticated pre-read algorithms to quickly
312 	 * determine if a file is likely already in cache, especially small
313 	 * reftables which may be smaller than a typical DFS block size.
314 	 *
315 	 * @param key
316 	 *            the file to check.
317 	 * @return true if block 0 (the first block) is in the cache.
318 	 */
319 	public boolean hasBlock0(DfsStreamKey key) {
320 		HashEntry e1 = table.get(slot(key, 0));
321 		DfsBlock v = scan(e1, key, 0);
322 		return v != null && v.contains(key, 0);
323 	}
324 
325 	private int hash(int packHash, long off) {
326 		return packHash + (int) (off >>> blockSizeShift);
327 	}
328 
329 	int getBlockSize() {
330 		return blockSize;
331 	}
332 
333 	private static int tableSize(DfsBlockCacheConfig cfg) {
334 		final int wsz = cfg.getBlockSize();
335 		final long limit = cfg.getBlockLimit();
336 		if (wsz <= 0)
337 			throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
338 		if (limit < wsz)
339 			throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
340 		return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
341 	}
342 
343 	/**
344 	 * Lookup a cached object, creating and loading it if it doesn't exist.
345 	 *
346 	 * @param file
347 	 *            the pack that "contains" the cached object.
348 	 * @param position
349 	 *            offset within <code>pack</code> of the object.
350 	 * @param ctx
351 	 *            current thread's reader.
352 	 * @param fileChannel
353 	 *            optional channel to read {@code pack}.
354 	 * @return the object reference.
355 	 * @throws IOException
356 	 *             the reference was not in the cache and could not be loaded.
357 	 */
358 	DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
359 			@Nullable ReadableChannel fileChannel) throws IOException {
360 		final long requestedPosition = position;
361 		position = file.alignToBlock(position);
362 
363 		DfsStreamKey key = file.key;
364 		int slot = slot(key, position);
365 		HashEntry e1 = table.get(slot);
366 		DfsBlock v = scan(e1, key, position);
367 		if (v != null && v.contains(key, requestedPosition)) {
368 			ctx.stats.blockCacheHit++;
369 			getStat(statHit, key).incrementAndGet();
370 			return v;
371 		}
372 
373 		reserveSpace(blockSize, key);
374 		ReentrantLock regionLock = lockFor(key, position);
375 		regionLock.lock();
376 		try {
377 			HashEntry e2 = table.get(slot);
378 			if (e2 != e1) {
379 				v = scan(e2, key, position);
380 				if (v != null) {
381 					ctx.stats.blockCacheHit++;
382 					getStat(statHit, key).incrementAndGet();
383 					creditSpace(blockSize, key);
384 					return v;
385 				}
386 			}
387 
388 			getStat(statMiss, key).incrementAndGet();
389 			boolean credit = true;
390 			try {
391 				v = file.readOneBlock(requestedPosition, ctx, fileChannel);
392 				credit = false;
393 			} finally {
394 				if (credit)
395 					creditSpace(blockSize, key);
396 			}
397 			if (position != v.start) {
398 				// The file discovered its blockSize and adjusted.
399 				position = v.start;
400 				slot = slot(key, position);
401 				e2 = table.get(slot);
402 			}
403 
404 			Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
405 			ref.hot = true;
406 			for (;;) {
407 				HashEntry n = new HashEntry(clean(e2), ref);
408 				if (table.compareAndSet(slot, e2, n))
409 					break;
410 				e2 = table.get(slot);
411 			}
412 			addToClock(ref, blockSize - v.size());
413 		} finally {
414 			regionLock.unlock();
415 		}
416 
417 		// If the block size changed from the default, it is possible the block
418 		// that was loaded is the wrong block for the requested position.
419 		if (v.contains(file.key, requestedPosition))
420 			return v;
421 		return getOrLoad(file, requestedPosition, ctx, fileChannel);
422 	}
423 
424 	@SuppressWarnings("unchecked")
425 	private void reserveSpace(int reserve, DfsStreamKey key) {
426 		clockLock.lock();
427 		try {
428 			long live = LongStream.of(getCurrentSize()).sum() + reserve;
429 			if (maxBytes < live) {
430 				Ref prev = clockHand;
431 				Ref hand = clockHand.next;
432 				do {
433 					if (hand.hot) {
434 						// Value was recently touched. Clear
435 						// hot and give it another chance.
436 						hand.hot = false;
437 						prev = hand;
438 						hand = hand.next;
439 						continue;
440 					} else if (prev == hand)
441 						break;
442 
443 					// No recent access since last scan, kill
444 					// value and remove from clock.
445 					Ref dead = hand;
446 					hand = hand.next;
447 					prev.next = hand;
448 					dead.next = null;
449 					dead.value = null;
450 					live -= dead.size;
451 					getStat(liveBytes, dead.key).addAndGet(-dead.size);
452 					getStat(statEvict, dead.key).incrementAndGet();
453 				} while (maxBytes < live);
454 				clockHand = prev;
455 			}
456 			getStat(liveBytes, key).addAndGet(reserve);
457 		} finally {
458 			clockLock.unlock();
459 		}
460 	}
461 
462 	private void creditSpace(int credit, DfsStreamKey key) {
463 		clockLock.lock();
464 		try {
465 			getStat(liveBytes, key).addAndGet(-credit);
466 		} finally {
467 			clockLock.unlock();
468 		}
469 	}
470 
471 	@SuppressWarnings("unchecked")
472 	private void addToClock(Ref ref, int credit) {
473 		clockLock.lock();
474 		try {
475 			if (credit != 0) {
476 				getStat(liveBytes, ref.key).addAndGet(-credit);
477 			}
478 			Ref ptr = clockHand;
479 			ref.next = ptr.next;
480 			ptr.next = ref;
481 			clockHand = ref;
482 		} finally {
483 			clockLock.unlock();
484 		}
485 	}
486 
487 	void put(DfsBlock v) {
488 		put(v.stream, v.start, v.size(), v);
489 	}
490 
491 	<T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
492 		return put(key, 0, (int) Math.min(size, Integer.MAX_VALUE), v);
493 	}
494 
495 	<T> Ref<T> put(DfsStreamKey key, long pos, int size, T v) {
496 		int slot = slot(key, pos);
497 		HashEntry e1 = table.get(slot);
498 		Ref<T> ref = scanRef(e1, key, pos);
499 		if (ref != null)
500 			return ref;
501 
502 		reserveSpace(size, key);
503 		ReentrantLock regionLock = lockFor(key, pos);
504 		regionLock.lock();
505 		try {
506 			HashEntry e2 = table.get(slot);
507 			if (e2 != e1) {
508 				ref = scanRef(e2, key, pos);
509 				if (ref != null) {
510 					creditSpace(size, key);
511 					return ref;
512 				}
513 			}
514 
515 			ref = new Ref<>(key, pos, size, v);
516 			ref.hot = true;
517 			for (;;) {
518 				HashEntry n = new HashEntry(clean(e2), ref);
519 				if (table.compareAndSet(slot, e2, n))
520 					break;
521 				e2 = table.get(slot);
522 			}
523 			addToClock(ref, 0);
524 		} finally {
525 			regionLock.unlock();
526 		}
527 		return ref;
528 	}
529 
530 	boolean contains(DfsStreamKey key, long position) {
531 		return scan(table.get(slot(key, position)), key, position) != null;
532 	}
533 
534 	@SuppressWarnings("unchecked")
535 	<T> T get(DfsStreamKey key, long position) {
536 		T val = (T) scan(table.get(slot(key, position)), key, position);
537 		if (val == null)
538 			getStat(statMiss, key).incrementAndGet();
539 		else
540 			getStat(statHit, key).incrementAndGet();
541 		return val;
542 	}
543 
544 	private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
545 		Ref<T> r = scanRef(n, key, position);
546 		return r != null ? r.get() : null;
547 	}
548 
549 	<T> Ref<T> getRef(DfsStreamKey key) {
550 		Ref<T> r = scanRef(table.get(slot(key, 0)), key, 0);
551 		if (r != null)
552 			getStat(statHit, key).incrementAndGet();
553 		else
554 			getStat(statMiss, key).incrementAndGet();
555 		return r;
556 	}
557 
558 	@SuppressWarnings("unchecked")
559 	private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
560 		for (; n != null; n = n.next) {
561 			Ref<T> r = n.ref;
562 			if (r.position == position && r.key.equals(key))
563 				return r.get() != null ? r : null;
564 		}
565 		return null;
566 	}
567 
568 	private int slot(DfsStreamKey key, long position) {
569 		return (hash(key.hash, position) >>> 1) % tableSize;
570 	}
571 
572 	private ReentrantLock lockFor(DfsStreamKey key, long position) {
573 		return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
574 	}
575 
576 	private static AtomicLong[] newCounters() {
577 		AtomicLong[] ret = new AtomicLong[PackExt.values().length];
578 		for (int i = 0; i < ret.length; i++) {
579 			ret[i] = new AtomicLong();
580 		}
581 		return ret;
582 	}
583 
584 	private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
585 			DfsStreamKey key) {
586 		int pos = key.packExtPos;
587 		while (true) {
588 			AtomicLong[] vals = stats.get();
589 			if (pos < vals.length) {
590 				return vals[pos];
591 			}
592 			AtomicLong[] expect = vals;
593 			vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
594 			System.arraycopy(expect, 0, vals, 0, expect.length);
595 			for (int i = expect.length; i < vals.length; i++) {
596 				vals[i] = new AtomicLong();
597 			}
598 			if (stats.compareAndSet(expect, vals)) {
599 				return vals[pos];
600 			}
601 		}
602 	}
603 
604 	private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
605 		AtomicLong[] stats = stat.get();
606 		long[] cnt = new long[stats.length];
607 		for (int i = 0; i < stats.length; i++) {
608 			cnt[i] = stats[i].get();
609 		}
610 		return cnt;
611 	}
612 
613 	private static HashEntry clean(HashEntry top) {
614 		while (top != null && top.ref.next == null)
615 			top = top.next;
616 		if (top == null)
617 			return null;
618 		HashEntry n = clean(top.next);
619 		return n == top.next ? top : new HashEntry(n, top.ref);
620 	}
621 
622 	private static final class HashEntry {
623 		/** Next entry in the hash table's chain list. */
624 		final HashEntry next;
625 
626 		/** The referenced object. */
627 		final Ref ref;
628 
629 		HashEntry(HashEntry n, Ref r) {
630 			next = n;
631 			ref = r;
632 		}
633 	}
634 
635 	static final class Ref<T> {
636 		final DfsStreamKey key;
637 		final long position;
638 		final int size;
639 		volatile T value;
640 		Ref next;
641 		volatile boolean hot;
642 
643 		Ref(DfsStreamKey key, long position, int size, T v) {
644 			this.key = key;
645 			this.position = position;
646 			this.size = size;
647 			this.value = v;
648 		}
649 
650 		T get() {
651 			T v = value;
652 			if (v != null)
653 				hot = true;
654 			return v;
655 		}
656 
657 		boolean has() {
658 			return value != null;
659 		}
660 	}
661 }