1
2
3
4
5
6
7
8
9
10
11
12 package org.eclipse.jgit.internal.storage.dfs;
13
14 import java.io.IOException;
15 import java.time.Duration;
16 import java.util.Map;
17 import java.util.concurrent.ConcurrentHashMap;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.atomic.AtomicReference;
21 import java.util.concurrent.atomic.AtomicReferenceArray;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.function.Consumer;
24 import java.util.stream.LongStream;
25
26 import org.eclipse.jgit.internal.JGitText;
27 import org.eclipse.jgit.internal.storage.pack.PackExt;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 public final class DfsBlockCache {
66 private static volatile DfsBlockCache cache;
67
68 static {
69 reconfigure(new DfsBlockCacheConfig());
70 }
71
72
73
74
75
76
77
78
79
80
81
82
83
84 public static void reconfigure(DfsBlockCacheConfig cfg) {
85 cache = new DfsBlockCache(cfg);
86 }
87
88
89
90
91
92
93 public static DfsBlockCache getInstance() {
94 return cache;
95 }
96
97
98 private final int tableSize;
99
100
101 private final AtomicReferenceArray<HashEntry> table;
102
103
104
105
106
107
108 private final ReentrantLock[] loadLocks;
109
110
111
112
113
114 private final ReentrantLock[][] refLocks;
115
116
117 private final long maxBytes;
118
119
120 private final long maxStreamThroughCache;
121
122
123
124
125
126
127
128
129
130
131
132 private final int blockSize;
133
134
135 private final int blockSizeShift;
136
137
138
139
140 private final AtomicReference<AtomicLong[]> statHit;
141
142
143
144
145
146 private final AtomicReference<AtomicLong[]> statMiss;
147
148
149
150
151
152 private final AtomicReference<AtomicLong[]> statEvict;
153
154
155
156
157 private final AtomicReference<AtomicLong[]> liveBytes;
158
159
160 private final ReentrantLock clockLock;
161
162
163
164
165 private final Consumer<Long> refLockWaitTime;
166
167
168 private Ref clockHand;
169
170
171 private final int[] cacheHotLimits = new int[PackExt.values().length];
172
173
174 private final DfsBlockCacheConfig.IndexEventConsumer indexEventConsumer;
175
176
177 private final Map<EvictKey, Long> indexEvictionMap = new ConcurrentHashMap<>();
178
179 @SuppressWarnings("unchecked")
180 private DfsBlockCache(DfsBlockCacheConfig cfg) {
181 tableSize = tableSize(cfg);
182 if (tableSize < 1) {
183 throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
184 }
185
186 table = new AtomicReferenceArray<>(tableSize);
187 int concurrencyLevel = cfg.getConcurrencyLevel();
188 loadLocks = new ReentrantLock[concurrencyLevel];
189 for (int i = 0; i < loadLocks.length; i++) {
190 loadLocks[i] = new ReentrantLock(true );
191 }
192 refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel];
193 for (int i = 0; i < PackExt.values().length; i++) {
194 for (int j = 0; j < concurrencyLevel; ++j) {
195 refLocks[i][j] = new ReentrantLock(true );
196 }
197 }
198
199 maxBytes = cfg.getBlockLimit();
200 maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
201 blockSize = cfg.getBlockSize();
202 blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
203
204 clockLock = new ReentrantLock(true );
205 String none = "";
206 clockHand = new Ref<>(
207 DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
208 -1, 0, null);
209 clockHand.next = clockHand;
210
211 statHit = new AtomicReference<>(newCounters());
212 statMiss = new AtomicReference<>(newCounters());
213 statEvict = new AtomicReference<>(newCounters());
214 liveBytes = new AtomicReference<>(newCounters());
215
216 refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
217
218 for (int i = 0; i < PackExt.values().length; ++i) {
219 Integer limit = cfg.getCacheHotMap().get(PackExt.values()[i]);
220 if (limit != null && limit.intValue() > 0) {
221 cacheHotLimits[i] = limit.intValue();
222 } else {
223 cacheHotLimits[i] = DfsBlockCacheConfig.DEFAULT_CACHE_HOT_MAX;
224 }
225 }
226 indexEventConsumer = cfg.getIndexEventConsumer();
227 }
228
229 boolean shouldCopyThroughCache(long length) {
230 return length <= maxStreamThroughCache;
231 }
232
233
234
235
236
237
238 public long[] getCurrentSize() {
239 return getStatVals(liveBytes);
240 }
241
242
243
244
245
246
247 public long getFillPercentage() {
248 return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
249 }
250
251
252
253
254
255
256
257 public long[] getHitCount() {
258 return getStatVals(statHit);
259 }
260
261
262
263
264
265
266
267
268 public long[] getMissCount() {
269 return getStatVals(statMiss);
270 }
271
272
273
274
275
276
277 public long[] getTotalRequestCount() {
278 AtomicLong[] hit = statHit.get();
279 AtomicLong[] miss = statMiss.get();
280 long[] cnt = new long[Math.max(hit.length, miss.length)];
281 for (int i = 0; i < hit.length; i++) {
282 cnt[i] += hit[i].get();
283 }
284 for (int i = 0; i < miss.length; i++) {
285 cnt[i] += miss[i].get();
286 }
287 return cnt;
288 }
289
290
291
292
293
294
295 public long[] getHitRatio() {
296 AtomicLong[] hit = statHit.get();
297 AtomicLong[] miss = statMiss.get();
298 long[] ratio = new long[Math.max(hit.length, miss.length)];
299 for (int i = 0; i < ratio.length; i++) {
300 if (i >= hit.length) {
301 ratio[i] = 0;
302 } else if (i >= miss.length) {
303 ratio[i] = 100;
304 } else {
305 long hitVal = hit[i].get();
306 long missVal = miss[i].get();
307 long total = hitVal + missVal;
308 ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
309 }
310 }
311 return ratio;
312 }
313
314
315
316
317
318
319
320
321 public long[] getEvictions() {
322 return getStatVals(statEvict);
323 }
324
325
326
327
328
329
330
331
332
333
334
335
336 public boolean hasBlock0(DfsStreamKey key) {
337 HashEntry e1 = table.get(slot(key, 0));
338 DfsBlock v = scan(e1, key, 0);
339 return v != null && v.contains(key, 0);
340 }
341
342 private int hash(int packHash, long off) {
343 return packHash + (int) (off >>> blockSizeShift);
344 }
345
346 int getBlockSize() {
347 return blockSize;
348 }
349
350 private static int tableSize(DfsBlockCacheConfig cfg) {
351 final int wsz = cfg.getBlockSize();
352 final long limit = cfg.getBlockLimit();
353 if (wsz <= 0) {
354 throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
355 }
356 if (limit < wsz) {
357 throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
358 }
359 return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
360 }
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377 DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
378 ReadableChannelSupplier fileChannel) throws IOException {
379 final long requestedPosition = position;
380 position = file.alignToBlock(position);
381
382 DfsStreamKey key = file.key;
383 int slot = slot(key, position);
384 HashEntry e1 = table.get(slot);
385 DfsBlock v = scan(e1, key, position);
386 if (v != null && v.contains(key, requestedPosition)) {
387 ctx.stats.blockCacheHit++;
388 getStat(statHit, key).incrementAndGet();
389 return v;
390 }
391
392 reserveSpace(blockSize, key);
393 ReentrantLock regionLock = lockFor(key, position);
394 regionLock.lock();
395 try {
396 HashEntry e2 = table.get(slot);
397 if (e2 != e1) {
398 v = scan(e2, key, position);
399 if (v != null) {
400 ctx.stats.blockCacheHit++;
401 getStat(statHit, key).incrementAndGet();
402 creditSpace(blockSize, key);
403 return v;
404 }
405 }
406
407 getStat(statMiss, key).incrementAndGet();
408 boolean credit = true;
409 try {
410 v = file.readOneBlock(position, ctx, fileChannel.get());
411 credit = false;
412 } finally {
413 if (credit) {
414 creditSpace(blockSize, key);
415 }
416 }
417 if (position != v.start) {
418
419 position = v.start;
420 slot = slot(key, position);
421 e2 = table.get(slot);
422 }
423
424 Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
425 ref.markHotter();
426 for (;;) {
427 HashEntry n = new HashEntry(clean(e2), ref);
428 if (table.compareAndSet(slot, e2, n)) {
429 break;
430 }
431 e2 = table.get(slot);
432 }
433 addToClock(ref, blockSize - v.size());
434 } finally {
435 regionLock.unlock();
436 }
437
438
439
440 if (v.contains(file.key, requestedPosition)) {
441 return v;
442 }
443 return getOrLoad(file, requestedPosition, ctx, fileChannel);
444 }
445
446 @SuppressWarnings("unchecked")
447 private void reserveSpace(long reserve, DfsStreamKey key) {
448 clockLock.lock();
449 try {
450 long live = LongStream.of(getCurrentSize()).sum() + reserve;
451 if (maxBytes < live) {
452 Ref prev = clockHand;
453 Ref hand = clockHand.next;
454 do {
455 if (hand.isHot()) {
456
457
458 hand.markColder();
459 prev = hand;
460 hand = hand.next;
461 continue;
462 } else if (prev == hand)
463 break;
464
465
466
467 Ref dead = hand;
468 hand = hand.next;
469 prev.next = hand;
470 dead.next = null;
471 dead.value = null;
472 live -= dead.size;
473 getStat(liveBytes, dead.key).addAndGet(-dead.size);
474 getStat(statEvict, dead.key).incrementAndGet();
475 reportIndexEvicted(dead);
476 } while (maxBytes < live);
477 clockHand = prev;
478 }
479 getStat(liveBytes, key).addAndGet(reserve);
480 } finally {
481 clockLock.unlock();
482 }
483 }
484
485 private void creditSpace(long credit, DfsStreamKey key) {
486 clockLock.lock();
487 try {
488 getStat(liveBytes, key).addAndGet(-credit);
489 } finally {
490 clockLock.unlock();
491 }
492 }
493
494 @SuppressWarnings("unchecked")
495 private void addToClock(Ref ref, long credit) {
496 clockLock.lock();
497 try {
498 if (credit != 0) {
499 getStat(liveBytes, ref.key).addAndGet(-credit);
500 }
501 Ref ptr = clockHand;
502 ref.next = ptr.next;
503 ptr.next = ref;
504 clockHand = ref;
505 } finally {
506 clockLock.unlock();
507 }
508 }
509
510 void put(DfsBlock v) {
511 put(v.stream, v.start, v.size(), v);
512 }
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527 <T> Ref<T> getOrLoadRef(
528 DfsStreamKey key, long position, RefLoader<T> loader)
529 throws IOException {
530 long start = System.nanoTime();
531 int slot = slot(key, position);
532 HashEntry e1 = table.get(slot);
533 Ref<T> ref = scanRef(e1, key, position);
534 if (ref != null) {
535 getStat(statHit, key).incrementAndGet();
536 reportIndexRequested(ref, true , start);
537 return ref;
538 }
539
540 ReentrantLock regionLock = lockForRef(key);
541 long lockStart = System.currentTimeMillis();
542 regionLock.lock();
543 try {
544 HashEntry e2 = table.get(slot);
545 if (e2 != e1) {
546 ref = scanRef(e2, key, position);
547 if (ref != null) {
548 getStat(statHit, key).incrementAndGet();
549 reportIndexRequested(ref, true ,
550 start);
551 return ref;
552 }
553 }
554
555 if (refLockWaitTime != null) {
556 refLockWaitTime.accept(
557 Long.valueOf(System.currentTimeMillis() - lockStart));
558 }
559 getStat(statMiss, key).incrementAndGet();
560 ref = loader.load();
561 ref.markHotter();
562
563 reserveSpace(ref.size, key);
564 for (;;) {
565 HashEntry n = new HashEntry(clean(e2), ref);
566 if (table.compareAndSet(slot, e2, n)) {
567 break;
568 }
569 e2 = table.get(slot);
570 }
571 addToClock(ref, 0);
572 } finally {
573 regionLock.unlock();
574 }
575 reportIndexRequested(ref, false , start);
576 return ref;
577 }
578
579 <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
580 return put(key, 0, size, v);
581 }
582
583 <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
584 int slot = slot(key, pos);
585 HashEntry e1 = table.get(slot);
586 Ref<T> ref = scanRef(e1, key, pos);
587 if (ref != null) {
588 return ref;
589 }
590
591 reserveSpace(size, key);
592 ReentrantLock regionLock = lockFor(key, pos);
593 regionLock.lock();
594 try {
595 HashEntry e2 = table.get(slot);
596 if (e2 != e1) {
597 ref = scanRef(e2, key, pos);
598 if (ref != null) {
599 creditSpace(size, key);
600 return ref;
601 }
602 }
603
604 ref = new Ref<>(key, pos, size, v);
605 ref.markHotter();
606 for (;;) {
607 HashEntry n = new HashEntry(clean(e2), ref);
608 if (table.compareAndSet(slot, e2, n)) {
609 break;
610 }
611 e2 = table.get(slot);
612 }
613 addToClock(ref, 0);
614 } finally {
615 regionLock.unlock();
616 }
617 return ref;
618 }
619
620 boolean contains(DfsStreamKey key, long position) {
621 return scan(table.get(slot(key, position)), key, position) != null;
622 }
623
624 @SuppressWarnings("unchecked")
625 <T> T get(DfsStreamKey key, long position) {
626 T val = (T) scan(table.get(slot(key, position)), key, position);
627 if (val == null) {
628 getStat(statMiss, key).incrementAndGet();
629 } else {
630 getStat(statHit, key).incrementAndGet();
631 }
632 return val;
633 }
634
635 private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
636 Ref<T> r = scanRef(n, key, position);
637 return r != null ? r.get() : null;
638 }
639
640 @SuppressWarnings("unchecked")
641 private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
642 for (; n != null; n = n.next) {
643 Ref<T> r = n.ref;
644 if (r.position == position && r.key.equals(key)) {
645 return r.get() != null ? r : null;
646 }
647 }
648 return null;
649 }
650
651 private int slot(DfsStreamKey key, long position) {
652 return (hash(key.hash, position) >>> 1) % tableSize;
653 }
654
655 private ReentrantLock lockFor(DfsStreamKey key, long position) {
656 return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
657 }
658
659 private ReentrantLock lockForRef(DfsStreamKey key) {
660 int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length;
661 return refLocks[key.packExtPos][slot];
662 }
663
664 private static AtomicLong[] newCounters() {
665 AtomicLong[] ret = new AtomicLong[PackExt.values().length];
666 for (int i = 0; i < ret.length; i++) {
667 ret[i] = new AtomicLong();
668 }
669 return ret;
670 }
671
672 private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
673 DfsStreamKey key) {
674 int pos = key.packExtPos;
675 while (true) {
676 AtomicLong[] vals = stats.get();
677 if (pos < vals.length) {
678 return vals[pos];
679 }
680 AtomicLong[] expect = vals;
681 vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
682 System.arraycopy(expect, 0, vals, 0, expect.length);
683 for (int i = expect.length; i < vals.length; i++) {
684 vals[i] = new AtomicLong();
685 }
686 if (stats.compareAndSet(expect, vals)) {
687 return vals[pos];
688 }
689 }
690 }
691
692 private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
693 AtomicLong[] stats = stat.get();
694 long[] cnt = new long[stats.length];
695 for (int i = 0; i < stats.length; i++) {
696 cnt[i] = stats[i].get();
697 }
698 return cnt;
699 }
700
701 private static HashEntry clean(HashEntry top) {
702 while (top != null && top.ref.next == null) {
703 top = top.next;
704 }
705 if (top == null) {
706 return null;
707 }
708 HashEntry n = clean(top.next);
709 return n == top.next ? top : new HashEntry(n, top.ref);
710 }
711
712 private void reportIndexRequested(Ref<?> ref, boolean cacheHit,
713 long start) {
714 if (indexEventConsumer == null
715 || !isIndexOrBitmapExtPos(ref.key.packExtPos)) {
716 return;
717 }
718 EvictKey evictKey = new EvictKey(ref);
719 Long prevEvictedTime = indexEvictionMap.get(evictKey);
720 long now = System.nanoTime();
721 long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
722 : now - prevEvictedTime.longValue();
723 indexEventConsumer.acceptRequestedEvent(ref.key.packExtPos, cacheHit,
724 (now - start) / 1000L , ref.size,
725 Duration.ofNanos(sinceLastEvictionNanos));
726 }
727
728 private void reportIndexEvicted(Ref<?> dead) {
729 if (indexEventConsumer == null
730 || !indexEventConsumer.shouldReportEvictedEvent()
731 || !isIndexOrBitmapExtPos(dead.key.packExtPos)) {
732 return;
733 }
734 EvictKey evictKey = new EvictKey(dead);
735 Long prevEvictedTime = indexEvictionMap.get(evictKey);
736 long now = System.nanoTime();
737 long sinceLastEvictionNanos = prevEvictedTime == null ? 0L
738 : now - prevEvictedTime.longValue();
739 indexEvictionMap.put(evictKey, Long.valueOf(now));
740 indexEventConsumer.acceptEvictedEvent(dead.key.packExtPos, dead.size,
741 dead.totalHitCount.get(),
742 Duration.ofNanos(sinceLastEvictionNanos));
743 }
744
745 private static boolean isIndexOrBitmapExtPos(int packExtPos) {
746 return packExtPos == PackExt.INDEX.getPosition()
747 || packExtPos == PackExt.BITMAP_INDEX.getPosition();
748 }
749
750 private static final class HashEntry {
751
752 final HashEntry next;
753
754
755 final Ref ref;
756
757 HashEntry(HashEntry n, Ref r) {
758 next = n;
759 ref = r;
760 }
761 }
762
763 static final class Ref<T> {
764 final DfsStreamKey key;
765 final long position;
766 final long size;
767 volatile T value;
768 Ref next;
769
770 private volatile int hotCount;
771 private AtomicInteger totalHitCount = new AtomicInteger();
772
773 Ref(DfsStreamKey key, long position, long size, T v) {
774 this.key = key;
775 this.position = position;
776 this.size = size;
777 this.value = v;
778 }
779
780 T get() {
781 T v = value;
782 if (v != null) {
783 markHotter();
784 }
785 return v;
786 }
787
788 boolean has() {
789 return value != null;
790 }
791
792 void markHotter() {
793 int cap = DfsBlockCache
794 .getInstance().cacheHotLimits[key.packExtPos];
795 hotCount = Math.min(cap, hotCount + 1);
796 totalHitCount.incrementAndGet();
797 }
798
799 void markColder() {
800 hotCount = Math.max(0, hotCount - 1);
801 }
802
803 boolean isHot() {
804 return hotCount > 0;
805 }
806 }
807
808 private static final class EvictKey {
809 private final int keyHash;
810 private final int packExtPos;
811 private final long position;
812
813 EvictKey(Ref<?> ref) {
814 keyHash = ref.key.hash;
815 packExtPos = ref.key.packExtPos;
816 position = ref.position;
817 }
818
819 @Override
820 public boolean equals(Object object) {
821 if (object instanceof EvictKey) {
822 EvictKey other = (EvictKey) object;
823 return keyHash == other.keyHash
824 && packExtPos == other.packExtPos
825 && position == other.position;
826 }
827 return false;
828 }
829
830 @Override
831 public int hashCode() {
832 return DfsBlockCache.getInstance().hash(keyHash, position);
833 }
834 }
835
836 @FunctionalInterface
837 interface RefLoader<T> {
838 Ref<T> load() throws IOException;
839 }
840
841
842
843
844 @FunctionalInterface
845 interface ReadableChannelSupplier {
846
847
848
849
850 ReadableChannel get() throws IOException;
851 }
852 }