View Javadoc
1   /*
2    * Copyright (C) 2008-2011, Google Inc.
3    * Copyright (C) 2008, Shawn O. Pearce <spearce@spearce.org> and others
4    *
5    * This program and the accompanying materials are made available under the
6    * terms of the Eclipse Distribution License v. 1.0 which is available at
7    * https://www.eclipse.org/org/documents/edl-v10.php.
8    *
9    * SPDX-License-Identifier: BSD-3-Clause
10   */
11  
12  package org.eclipse.jgit.internal.storage.dfs;
13  
14  import java.io.IOException;
15  import java.time.Duration;
16  import java.util.Map;
17  import java.util.concurrent.ConcurrentHashMap;
18  import java.util.concurrent.atomic.AtomicInteger;
19  import java.util.concurrent.atomic.AtomicLong;
20  import java.util.concurrent.atomic.AtomicReference;
21  import java.util.concurrent.atomic.AtomicReferenceArray;
22  import java.util.concurrent.locks.ReentrantLock;
23  import java.util.function.Consumer;
24  import java.util.stream.LongStream;
25  
26  import org.eclipse.jgit.internal.JGitText;
27  import org.eclipse.jgit.internal.storage.pack.PackExt;
28  
29  /**
30   * Caches slices of a
31   * {@link org.eclipse.jgit.internal.storage.dfs.BlockBasedFile} in memory for
32   * faster read access.
33   * <p>
34   * The DfsBlockCache serves as a Java based "buffer cache", loading segments of
35   * a BlockBasedFile into the JVM heap prior to use. As JGit often wants to do
36   * reads of only tiny slices of a file, the DfsBlockCache tries to smooth out
37   * these tiny reads into larger block-sized IO operations.
38   * <p>
39   * Whenever a cache miss occurs, loading is invoked by exactly one thread for
40   * the given <code>(DfsStreamKey,position)</code> key tuple. This is ensured by
41   * an array of locks, with the tuple hashed to a lock instance.
42   * <p>
43   * Its too expensive during object access to be accurate with a least recently
44   * used (LRU) algorithm. Strictly ordering every read is a lot of overhead that
45   * typically doesn't yield a corresponding benefit to the application. This
46   * cache implements a clock replacement algorithm, giving each block at least
47   * one chance to have been accessed during a sweep of the cache to save itself
48   * from eviction. The number of swipe chances is configurable per pack
49   * extension.
50   * <p>
51   * Entities created by the cache are held under hard references, preventing the
52   * Java VM from clearing anything. Blocks are discarded by the replacement
53   * algorithm when adding a new block would cause the cache to exceed its
54   * configured maximum size.
55   * <p>
56   * The key tuple is passed through to methods as a pair of parameters rather
57   * than as a single Object, thus reducing the transient memory allocations of
58   * callers. It is more efficient to avoid the allocation, as we can't be 100%
59   * sure that a JIT would be able to stack-allocate a key tuple.
60   * <p>
61   * The internal hash table does not expand at runtime, instead it is fixed in
62   * size at cache creation time. The internal lock table used to gate load
63   * invocations is also fixed in size.
64   */
65  public final class DfsBlockCache {
66  	private static volatile DfsBlockCache cache;
67  
68  	static {
69  		reconfigure(new DfsBlockCacheConfig());
70  	}
71  
72  	/**
73  	 * Modify the configuration of the window cache.
74  	 * <p>
75  	 * The new configuration is applied immediately, and the existing cache is
76  	 * cleared.
77  	 *
78  	 * @param cfg
79  	 *            the new window cache configuration.
80  	 * @throws java.lang.IllegalArgumentException
81  	 *             the cache configuration contains one or more invalid
82  	 *             settings, usually too low of a limit.
83  	 */
84  	public static void reconfigure(DfsBlockCacheConfig cfg) {
85  		cache = new DfsBlockCache(cfg);
86  	}
87  
88  	/**
89  	 * Get the currently active DfsBlockCache.
90  	 *
91  	 * @return the currently active DfsBlockCache.
92  	 */
93  	public static DfsBlockCache getInstance() {
94  		return cache;
95  	}
96  
97  	/** Number of entries in {@link #table}. */
98  	private final int tableSize;
99  
100 	/** Hash bucket directory; entries are chained below. */
101 	private final AtomicReferenceArray<HashEntry> table;
102 
103 	/**
104 	 * Locks to prevent concurrent loads for same (PackFile,position) block. The
105 	 * number of locks is {@link DfsBlockCacheConfig#getConcurrencyLevel()} to
106 	 * cap the overall concurrent block loads.
107 	 */
108 	private final ReentrantLock[] loadLocks;
109 
110 	/**
111 	 * A separate pool of locks per pack extension to prevent concurrent loads
112 	 * for same index or bitmap from PackFile.
113 	 */
114 	private final ReentrantLock[][] refLocks;
115 
116 	/** Maximum number of bytes the cache should hold. */
117 	private final long maxBytes;
118 
119 	/** Pack files smaller than this size can be copied through the cache. */
120 	private final long maxStreamThroughCache;
121 
122 	/**
123 	 * Suggested block size to read from pack files in.
124 	 * <p>
125 	 * If a pack file does not have a native block size, this size will be used.
126 	 * <p>
127 	 * If a pack file has a native size, a whole multiple of the native size
128 	 * will be used until it matches this size.
129 	 * <p>
130 	 * The value for blockSize must be a power of 2.
131 	 */
132 	private final int blockSize;
133 
134 	/** As {@link #blockSize} is a power of 2, bits to shift for a / blockSize. */
135 	private final int blockSizeShift;
136 
137 	/**
138 	 * Number of times a block was found in the cache, per pack file extension.
139 	 */
140 	private final AtomicReference<AtomicLong[]> statHit;
141 
142 	/**
143 	 * Number of times a block was not found, and had to be loaded, per pack
144 	 * file extension.
145 	 */
146 	private final AtomicReference<AtomicLong[]> statMiss;
147 
148 	/**
149 	 * Number of blocks evicted due to cache being full, per pack file
150 	 * extension.
151 	 */
152 	private final AtomicReference<AtomicLong[]> statEvict;
153 
154 	/**
155 	 * Number of bytes currently loaded in the cache, per pack file extension.
156 	 */
157 	private final AtomicReference<AtomicLong[]> liveBytes;
158 
159 	/** Protects the clock and its related data. */
160 	private final ReentrantLock clockLock;
161 
162 	/**
163 	 * A consumer of object reference lock wait time milliseconds.  May be used to build a metric.
164 	 */
165 	private final Consumer<Long> refLockWaitTime;
166 
167 	/** Current position of the clock. */
168 	private Ref clockHand;
169 
170 	/** Limits of cache hot count per pack file extension. */
171 	private final int[] cacheHotLimits = new int[PackExt.values().length];
172 
173 	/** Consumer of loading and eviction events of indexes. */
174 	private final DfsBlockCacheConfig.IndexEventConsumer indexEventConsumer;
175 
176 	/** Stores timestamps of the last eviction of indexes. */
177 	private final Map<EvictKey, Long> indexEvictionMap = new ConcurrentHashMap<>();
178 
179 	@SuppressWarnings("unchecked")
180 	private DfsBlockCache(DfsBlockCacheConfig cfg) {
181 		tableSize = tableSize(cfg);
182 		if (tableSize < 1) {
183 			throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
184 		}
185 
186 		table = new AtomicReferenceArray<>(tableSize);
187 		int concurrencyLevel = cfg.getConcurrencyLevel();
188 		loadLocks = new ReentrantLock[concurrencyLevel];
189 		for (int i = 0; i < loadLocks.length; i++) {
190 			loadLocks[i] = new ReentrantLock(true /* fair */);
191 		}
192 		refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel];
193 		for (int i = 0; i < PackExt.values().length; i++) {
194 			for (int j = 0; j < concurrencyLevel; ++j) {
195 				refLocks[i][j] = new ReentrantLock(true /* fair */);
196 			}
197 		}
198 
199 		maxBytes = cfg.getBlockLimit();
200 		maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
201 		blockSize = cfg.getBlockSize();
202 		blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
203 
204 		clockLock = new ReentrantLock(true /* fair */);
205 		String none = ""; //$NON-NLS-1$
206 		clockHand = new Ref<>(
207 				DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
208 				-1, 0, null);
209 		clockHand.next = clockHand;
210 
211 		statHit = new AtomicReference<>(newCounters());
212 		statMiss = new AtomicReference<>(newCounters());
213 		statEvict = new AtomicReference<>(newCounters());
214 		liveBytes = new AtomicReference<>(newCounters());
215 
216 		refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
217 
218 		for (int i = 0; i < PackExt.values().length; ++i) {
219 			Integer limit = cfg.getCacheHotMap().get(PackExt.values()[i]);
220 			if (limit != null && limit.intValue() > 0) {
221 				cacheHotLimits[i] = limit.intValue();
222 			} else {
223 				cacheHotLimits[i] = DfsBlockCacheConfig.DEFAULT_CACHE_HOT_MAX;
224 			}
225 		}
226 		indexEventConsumer = cfg.getIndexEventConsumer();
227 	}
228 
229 	boolean shouldCopyThroughCache(long length) {
230 		return length <= maxStreamThroughCache;
231 	}
232 
233 	/**
234 	 * Get total number of bytes in the cache, per pack file extension.
235 	 *
236 	 * @return total number of bytes in the cache, per pack file extension.
237 	 */
238 	public long[] getCurrentSize() {
239 		return getStatVals(liveBytes);
240 	}
241 
242 	/**
243 	 * Get 0..100, defining how full the cache is.
244 	 *
245 	 * @return 0..100, defining how full the cache is.
246 	 */
247 	public long getFillPercentage() {
248 		return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
249 	}
250 
251 	/**
252 	 * Get number of requests for items in the cache, per pack file extension.
253 	 *
254 	 * @return number of requests for items in the cache, per pack file
255 	 *         extension.
256 	 */
257 	public long[] getHitCount() {
258 		return getStatVals(statHit);
259 	}
260 
261 	/**
262 	 * Get number of requests for items not in the cache, per pack file
263 	 * extension.
264 	 *
265 	 * @return number of requests for items not in the cache, per pack file
266 	 *         extension.
267 	 */
268 	public long[] getMissCount() {
269 		return getStatVals(statMiss);
270 	}
271 
272 	/**
273 	 * Get total number of requests (hit + miss), per pack file extension.
274 	 *
275 	 * @return total number of requests (hit + miss), per pack file extension.
276 	 */
277 	public long[] getTotalRequestCount() {
278 		AtomicLong[] hit = statHit.get();
279 		AtomicLong[] miss = statMiss.get();
280 		long[] cnt = new long[Math.max(hit.length, miss.length)];
281 		for (int i = 0; i < hit.length; i++) {
282 			cnt[i] += hit[i].get();
283 		}
284 		for (int i = 0; i < miss.length; i++) {
285 			cnt[i] += miss[i].get();
286 		}
287 		return cnt;
288 	}
289 
290 	/**
291 	 * Get hit ratios
292 	 *
293 	 * @return hit ratios
294 	 */
295 	public long[] getHitRatio() {
296 		AtomicLong[] hit = statHit.get();
297 		AtomicLong[] miss = statMiss.get();
298 		long[] ratio = new long[Math.max(hit.length, miss.length)];
299 		for (int i = 0; i < ratio.length; i++) {
300 			if (i >= hit.length) {
301 				ratio[i] = 0;
302 			} else if (i >= miss.length) {
303 				ratio[i] = 100;
304 			} else {
305 				long hitVal = hit[i].get();
306 				long missVal = miss[i].get();
307 				long total = hitVal + missVal;
308 				ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
309 			}
310 		}
311 		return ratio;
312 	}
313 
314 	/**
315 	 * Get number of evictions performed due to cache being full, per pack file
316 	 * extension.
317 	 *
318 	 * @return number of evictions performed due to cache being full, per pack
319 	 *         file extension.
320 	 */
321 	public long[] getEvictions() {
322 		return getStatVals(statEvict);
323 	}
324 
325 	/**
326 	 * Quickly check if the cache contains block 0 of the given stream.
327 	 * <p>
328 	 * This can be useful for sophisticated pre-read algorithms to quickly
329 	 * determine if a file is likely already in cache, especially small
330 	 * reftables which may be smaller than a typical DFS block size.
331 	 *
332 	 * @param key
333 	 *            the file to check.
334 	 * @return true if block 0 (the first block) is in the cache.
335 	 */
336 	public boolean hasBlock0(DfsStreamKey key) {
337 		HashEntry e1 = table.get(slot(key, 0));
338 		DfsBlock v = scan(e1, key, 0);
339 		return v != null && v.contains(key, 0);
340 	}
341 
342 	private int hash(int packHash, long off) {
343 		return packHash + (int) (off >>> blockSizeShift);
344 	}
345 
346 	int getBlockSize() {
347 		return blockSize;
348 	}
349 
350 	private static int tableSize(DfsBlockCacheConfig cfg) {
351 		final int wsz = cfg.getBlockSize();
352 		final long limit = cfg.getBlockLimit();
353 		if (wsz <= 0) {
354 			throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
355 		}
356 		if (limit < wsz) {
357 			throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
358 		}
359 		return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
360 	}
361 
362 	/**
363 	 * Look up a cached object, creating and loading it if it doesn't exist.
364 	 *
365 	 * @param file
366 	 *            the pack that "contains" the cached object.
367 	 * @param position
368 	 *            offset within <code>pack</code> of the object.
369 	 * @param ctx
370 	 *            current thread's reader.
371 	 * @param fileChannel
372 	 *            supplier for channel to read {@code pack}.
373 	 * @return the object reference.
374 	 * @throws IOException
375 	 *             the reference was not in the cache and could not be loaded.
376 	 */
377 	DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
378 			ReadableChannelSupplier fileChannel) throws IOException {
379 		final long requestedPosition = position;
380 		position = file.alignToBlock(position);
381 
382 		DfsStreamKey key = file.key;
383 		int slot = slot(key, position);
384 		HashEntry e1 = table.get(slot);
385 		DfsBlock v = scan(e1, key, position);
386 		if (v != null && v.contains(key, requestedPosition)) {
387 			ctx.stats.blockCacheHit++;
388 			getStat(statHit, key).incrementAndGet();
389 			return v;
390 		}
391 
392 		reserveSpace(blockSize, key);
393 		ReentrantLock regionLock = lockFor(key, position);
394 		regionLock.lock();
395 		try {
396 			HashEntry e2 = table.get(slot);
397 			if (e2 != e1) {
398 				v = scan(e2, key, position);
399 				if (v != null) {
400 					ctx.stats.blockCacheHit++;
401 					getStat(statHit, key).incrementAndGet();
402 					creditSpace(blockSize, key);
403 					return v;
404 				}
405 			}
406 
407 			getStat(statMiss, key).incrementAndGet();
408 			boolean credit = true;
409 			try {
410 				v = file.readOneBlock(position, ctx, fileChannel.get());
411 				credit = false;
412 			} finally {
413 				if (credit) {
414 					creditSpace(blockSize, key);
415 				}
416 			}
417 			if (position != v.start) {
418 				// The file discovered its blockSize and adjusted.
419 				position = v.start;
420 				slot = slot(key, position);
421 				e2 = table.get(slot);
422 			}
423 
424 			Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
425 			ref.markHotter();
426 			for (;;) {
427 				HashEntry n = new HashEntry(clean(e2), ref);
428 				if (table.compareAndSet(slot, e2, n)) {
429 					break;
430 				}
431 				e2 = table.get(slot);
432 			}
433 			addToClock(ref, blockSize - v.size());
434 		} finally {
435 			regionLock.unlock();
436 		}
437 
438 		// If the block size changed from the default, it is possible the block
439 		// that was loaded is the wrong block for the requested position.
440 		if (v.contains(file.key, requestedPosition)) {
441 			return v;
442 		}
443 		return getOrLoad(file, requestedPosition, ctx, fileChannel);
444 	}
445 
446 	@SuppressWarnings("unchecked")
447 	private void reserveSpace(long reserve, DfsStreamKey key) {
448 		clockLock.lock();
449 		try {
450 			long live = LongStream.of(getCurrentSize()).sum() + reserve;
451 			if (maxBytes < live) {
452 				Ref prev = clockHand;
453 				Ref hand = clockHand.next;
454 				do {
455 					if (hand.isHot()) {
456 						// Value was recently touched. Cache is still hot so
457 						// give it another chance, but cool it down a bit.
458 						hand.markColder();
459 						prev = hand;
460 						hand = hand.next;
461 						continue;
462 					} else if (prev == hand)
463 						break;
464 
465 					// No recent access since last scan, kill
466 					// value and remove from clock.
467 					Ref dead = hand;
468 					hand = hand.next;
469 					prev.next = hand;
470 					dead.next = null;
471 					dead.value = null;
472 					live -= dead.size;
473 					getStat(liveBytes, dead.key).addAndGet(-dead.size);
474 					getStat(statEvict, dead.key).incrementAndGet();
475 					reportIndexEvicted(dead);
476 				} while (maxBytes < live);
477 				clockHand = prev;
478 			}
479 			getStat(liveBytes, key).addAndGet(reserve);
480 		} finally {
481 			clockLock.unlock();
482 		}
483 	}
484 
485 	private void creditSpace(long credit, DfsStreamKey key) {
486 		clockLock.lock();
487 		try {
488 			getStat(liveBytes, key).addAndGet(-credit);
489 		} finally {
490 			clockLock.unlock();
491 		}
492 	}
493 
494 	@SuppressWarnings("unchecked")
495 	private void addToClock(Ref ref, long credit) {
496 		clockLock.lock();
497 		try {
498 			if (credit != 0) {
499 				getStat(liveBytes, ref.key).addAndGet(-credit);
500 			}
501 			Ref ptr = clockHand;
502 			ref.next = ptr.next;
503 			ptr.next = ref;
504 			clockHand = ref;
505 		} finally {
506 			clockLock.unlock();
507 		}
508 	}
509 
510 	void put(DfsBlock v) {
511 		put(v.stream, v.start, v.size(), v);
512 	}
513 
514 	/**
515 	 * Look up a cached object, creating and loading it if it doesn't exist.
516 	 *
517 	 * @param key
518 	 *            the stream key of the pack.
519 	 * @param position
520 	 *            the position in the key. The default should be 0.
521 	 * @param loader
522 	 *            the function to load the reference.
523 	 * @return the object reference.
524 	 * @throws IOException
525 	 *             the reference was not in the cache and could not be loaded.
526 	 */
527 	<T> Ref<T> getOrLoadRef(
528 			DfsStreamKey key, long position, RefLoader<T> loader)
529 			throws IOException {
530 		long start = System.nanoTime();
531 		int slot = slot(key, position);
532 		HashEntry e1 = table.get(slot);
533 		Ref<T> ref = scanRef(e1, key, position);
534 		if (ref != null) {
535 			getStat(statHit, key).incrementAndGet();
536 			reportIndexRequested(ref, true /* cacheHit */, start);
537 			return ref;
538 		}
539 
540 		ReentrantLock regionLock = lockForRef(key);
541 		long lockStart = System.currentTimeMillis();
542 		regionLock.lock();
543 		try {
544 			HashEntry e2 = table.get(slot);
545 			if (e2 != e1) {
546 				ref = scanRef(e2, key, position);
547 				if (ref != null) {
548 					getStat(statHit, key).incrementAndGet();
549 					reportIndexRequested(ref, true /* cacheHit */,
550 							start);
551 					return ref;
552 				}
553 			}
554 
555 			if (refLockWaitTime != null) {
556 				refLockWaitTime.accept(
557 						Long.valueOf(System.currentTimeMillis() - lockStart));
558 			}
559 			getStat(statMiss, key).incrementAndGet();
560 			ref = loader.load();
561 			ref.markHotter();
562 			// Reserve after loading to get the size of the object
563 			reserveSpace(ref.size, key);
564 			for (;;) {
565 				HashEntry n = new HashEntry(clean(e2), ref);
566 				if (table.compareAndSet(slot, e2, n)) {
567 					break;
568 				}
569 				e2 = table.get(slot);
570 			}
571 			addToClock(ref, 0);
572 		} finally {
573 			regionLock.unlock();
574 		}
575 		reportIndexRequested(ref, false /* cacheHit */, start);
576 		return ref;
577 	}
578 
579 	<T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
580 		return put(key, 0, size, v);
581 	}
582 
583 	<T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
584 		int slot = slot(key, pos);
585 		HashEntry e1 = table.get(slot);
586 		Ref<T> ref = scanRef(e1, key, pos);
587 		if (ref != null) {
588 			return ref;
589 		}
590 
591 		reserveSpace(size, key);
592 		ReentrantLock regionLock = lockFor(key, pos);
593 		regionLock.lock();
594 		try {
595 			HashEntry e2 = table.get(slot);
596 			if (e2 != e1) {
597 				ref = scanRef(e2, key, pos);
598 				if (ref != null) {
599 					creditSpace(size, key);
600 					return ref;
601 				}
602 			}
603 
604 			ref = new Ref<>(key, pos, size, v);
605 			ref.markHotter();
606 			for (;;) {
607 				HashEntry n = new HashEntry(clean(e2), ref);
608 				if (table.compareAndSet(slot, e2, n)) {
609 					break;
610 				}
611 				e2 = table.get(slot);
612 			}
613 			addToClock(ref, 0);
614 		} finally {
615 			regionLock.unlock();
616 		}
617 		return ref;
618 	}
619 
620 	boolean contains(DfsStreamKey key, long position) {
621 		return scan(table.get(slot(key, position)), key, position) != null;
622 	}
623 
624 	@SuppressWarnings("unchecked")
625 	<T> T get(DfsStreamKey key, long position) {
626 		T val = (T) scan(table.get(slot(key, position)), key, position);
627 		if (val == null) {
628 			getStat(statMiss, key).incrementAndGet();
629 		} else {
630 			getStat(statHit, key).incrementAndGet();
631 		}
632 		return val;
633 	}
634 
635 	private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
636 		Ref<T> r = scanRef(n, key, position);
637 		return r != null ? r.get() : null;
638 	}
639 
640 	@SuppressWarnings("unchecked")
641 	private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
642 		for (; n != null; n = n.next) {
643 			Ref<T> r = n.ref;
644 			if (r.position == position && r.key.equals(key)) {
645 				return r.get() != null ? r : null;
646 			}
647 		}
648 		return null;
649 	}
650 
651 	private int slot(DfsStreamKey key, long position) {
652 		return (hash(key.hash, position) >>> 1) % tableSize;
653 	}
654 
655 	private ReentrantLock lockFor(DfsStreamKey key, long position) {
656 		return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
657 	}
658 
659 	private ReentrantLock lockForRef(DfsStreamKey key) {
660 		int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length;
661 		return refLocks[key.packExtPos][slot];
662 	}
663 
664 	private static AtomicLong[] newCounters() {
665 		AtomicLong[] ret = new AtomicLong[PackExt.values().length];
666 		for (int i = 0; i < ret.length; i++) {
667 			ret[i] = new AtomicLong();
668 		}
669 		return ret;
670 	}
671 
672 	private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
673 			DfsStreamKey key) {
674 		int pos = key.packExtPos;
675 		while (true) {
676 			AtomicLong[] vals = stats.get();
677 			if (pos < vals.length) {
678 				return vals[pos];
679 			}
680 			AtomicLong[] expect = vals;
681 			vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
682 			System.arraycopy(expect, 0, vals, 0, expect.length);
683 			for (int i = expect.length; i < vals.length; i++) {
684 				vals[i] = new AtomicLong();
685 			}
686 			if (stats.compareAndSet(expect, vals)) {
687 				return vals[pos];
688 			}
689 		}
690 	}
691 
692 	private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
693 		AtomicLong[] stats = stat.get();
694 		long[] cnt = new long[stats.length];
695 		for (int i = 0; i < stats.length; i++) {
696 			cnt[i] = stats[i].get();
697 		}
698 		return cnt;
699 	}
700 
701 	private static HashEntry clean(HashEntry top) {
702 		while (top != null && top.ref.next == null) {
703 			top = top.next;
704 		}
705 		if (top == null) {
706 			return null;
707 		}
708 		HashEntry n = clean(top.next);
709 		return n == top.next ? top : new HashEntry(n, top.ref);
710 	}
711 
712 	private void reportIndexRequested(Ref<?> ref, boolean cacheHit,
713 			long start) {
714 		if (indexEventConsumer == null
715 				|| !isIndexOrBitmapExtPos(ref.key.packExtPos)) {
716 			return;
717 		}
718 		EvictKey evictKey = new EvictKey(ref);
719 		Long prevEvictedTime = indexEvictionMap.get(evictKey);
720 		long now = System.nanoTime();
721 		long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
722 				: now - prevEvictedTime.longValue();
723 		indexEventConsumer.acceptRequestedEvent(ref.key.packExtPos, cacheHit,
724 				(now - start) / 1000L /* micros */, ref.size,
725 				Duration.ofNanos(sinceLastEvictionNanos));
726 	}
727 
728 	private void reportIndexEvicted(Ref<?> dead) {
729 		if (indexEventConsumer == null
730 				|| !indexEventConsumer.shouldReportEvictedEvent()
731 				|| !isIndexOrBitmapExtPos(dead.key.packExtPos)) {
732 			return;
733 		}
734 		EvictKey evictKey = new EvictKey(dead);
735 		Long prevEvictedTime = indexEvictionMap.get(evictKey);
736 		long now = System.nanoTime();
737 		long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
738 				: now - prevEvictedTime.longValue();
739 		indexEvictionMap.put(evictKey, Long.valueOf(now));
740 		indexEventConsumer.acceptEvictedEvent(dead.key.packExtPos, dead.size,
741 				dead.totalHitCount.get(),
742 				Duration.ofNanos(sinceLastEvictionNanos));
743 	}
744 
745 	private static boolean isIndexOrBitmapExtPos(int packExtPos) {
746 		return packExtPos == PackExt.INDEX.getPosition()
747 				|| packExtPos == PackExt.BITMAP_INDEX.getPosition();
748 	}
749 
750 	private static final class HashEntry {
751 		/** Next entry in the hash table's chain list. */
752 		final HashEntry next;
753 
754 		/** The referenced object. */
755 		final Ref ref;
756 
757 		HashEntry(HashEntry n, Ref r) {
758 			next = n;
759 			ref = r;
760 		}
761 	}
762 
763 	static final class Ref<T> {
764 		final DfsStreamKey key;
765 		final long position;
766 		final long size;
767 		volatile T value;
768 		Ref next;
769 
770 		private volatile int hotCount;
771 		private AtomicInteger totalHitCount = new AtomicInteger();
772 
773 		Ref(DfsStreamKey key, long position, long size, T v) {
774 			this.key = key;
775 			this.position = position;
776 			this.size = size;
777 			this.value = v;
778 		}
779 
780 		T get() {
781 			T v = value;
782 			if (v != null) {
783 				markHotter();
784 			}
785 			return v;
786 		}
787 
788 		boolean has() {
789 			return value != null;
790 		}
791 
792 		void markHotter() {
793 			int cap = DfsBlockCache
794 					.getInstance().cacheHotLimits[key.packExtPos];
795 			hotCount = Math.min(cap, hotCount + 1);
796 			totalHitCount.incrementAndGet();
797 		}
798 
799 		void markColder() {
800 			hotCount = Math.max(0, hotCount - 1);
801 		}
802 
803 		boolean isHot() {
804 			return hotCount > 0;
805 		}
806 	}
807 
808 	private static final class EvictKey {
809 		private final int keyHash;
810 		private final int packExtPos;
811 		private final long position;
812 
813 		EvictKey(Ref<?> ref) {
814 			keyHash = ref.key.hash;
815 			packExtPos = ref.key.packExtPos;
816 			position = ref.position;
817 		}
818 
819 		@Override
820 		public boolean equals(Object object) {
821 			if (object instanceof EvictKey) {
822 				EvictKey other = (EvictKey) object;
823 				return keyHash == other.keyHash
824 						&& packExtPos == other.packExtPos
825 						&& position == other.position;
826 			}
827 			return false;
828 		}
829 
830 		@Override
831 		public int hashCode() {
832 			return DfsBlockCache.getInstance().hash(keyHash, position);
833 		}
834 	}
835 
836 	@FunctionalInterface
837 	interface RefLoader<T> {
838 		Ref<T> load() throws IOException;
839 	}
840 
841 	/**
842 	 * Supplier for readable channel
843 	 */
844 	@FunctionalInterface
845 	interface ReadableChannelSupplier {
846 		/**
847 		 * @return ReadableChannel
848 		 * @throws IOException
849 		 */
850 		ReadableChannel get() throws IOException;
851 	}
852 }