1
2
3
4
5
6
7
8
9
10
11
12 package org.eclipse.jgit.internal.storage.dfs;
13
14 import java.io.IOException;
15 import java.util.concurrent.atomic.AtomicLong;
16 import java.util.concurrent.atomic.AtomicReference;
17 import java.util.concurrent.atomic.AtomicReferenceArray;
18 import java.util.concurrent.locks.ReentrantLock;
19 import java.util.function.Consumer;
20 import java.util.stream.LongStream;
21
22 import org.eclipse.jgit.internal.JGitText;
23 import org.eclipse.jgit.internal.storage.pack.PackExt;
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public final class DfsBlockCache {
62 private static volatile DfsBlockCache cache;
63
64 static {
65 reconfigure(new DfsBlockCacheConfig());
66 }
67
68
69
70
71
72
73
74
75
76
77
78
79
80 public static void reconfigure(DfsBlockCacheConfig cfg) {
81 cache = new DfsBlockCache(cfg);
82 }
83
84
85
86
87
88
89 public static DfsBlockCache getInstance() {
90 return cache;
91 }
92
93
94 private final int tableSize;
95
96
97 private final AtomicReferenceArray<HashEntry> table;
98
99
100
101
102
103
104 private final ReentrantLock[] loadLocks;
105
106
107
108
109
110 private final ReentrantLock[][] refLocks;
111
112
113 private final long maxBytes;
114
115
116 private final long maxStreamThroughCache;
117
118
119
120
121
122
123
124
125
126
127
128 private final int blockSize;
129
130
131 private final int blockSizeShift;
132
133
134
135
136 private final AtomicReference<AtomicLong[]> statHit;
137
138
139
140
141
142 private final AtomicReference<AtomicLong[]> statMiss;
143
144
145
146
147
148 private final AtomicReference<AtomicLong[]> statEvict;
149
150
151
152
153 private final AtomicReference<AtomicLong[]> liveBytes;
154
155
156 private final ReentrantLock clockLock;
157
158
159
160
161 private final Consumer<Long> refLockWaitTime;
162
163
164 private Ref clockHand;
165
166
167 private final int[] cacheHotLimits = new int[PackExt.values().length];
168
169 @SuppressWarnings("unchecked")
170 private DfsBlockCache(DfsBlockCacheConfig cfg) {
171 tableSize = tableSize(cfg);
172 if (tableSize < 1) {
173 throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
174 }
175
176 table = new AtomicReferenceArray<>(tableSize);
177 int concurrencyLevel = cfg.getConcurrencyLevel();
178 loadLocks = new ReentrantLock[concurrencyLevel];
179 for (int i = 0; i < loadLocks.length; i++) {
180 loadLocks[i] = new ReentrantLock(true );
181 }
182 refLocks = new ReentrantLock[PackExt.values().length][concurrencyLevel];
183 for (int i = 0; i < PackExt.values().length; i++) {
184 for (int j = 0; j < concurrencyLevel; ++j) {
185 refLocks[i][j] = new ReentrantLock(true );
186 }
187 }
188
189 maxBytes = cfg.getBlockLimit();
190 maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
191 blockSize = cfg.getBlockSize();
192 blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
193
194 clockLock = new ReentrantLock(true );
195 String none = "";
196 clockHand = new Ref<>(
197 DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
198 -1, 0, null);
199 clockHand.next = clockHand;
200
201 statHit = new AtomicReference<>(newCounters());
202 statMiss = new AtomicReference<>(newCounters());
203 statEvict = new AtomicReference<>(newCounters());
204 liveBytes = new AtomicReference<>(newCounters());
205
206 refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
207
208 for (int i = 0; i < PackExt.values().length; ++i) {
209 Integer limit = cfg.getCacheHotMap().get(PackExt.values()[i]);
210 if (limit != null && limit.intValue() > 0) {
211 cacheHotLimits[i] = limit.intValue();
212 } else {
213 cacheHotLimits[i] = DfsBlockCacheConfig.DEFAULT_CACHE_HOT_MAX;
214 }
215 }
216 }
217
218 boolean shouldCopyThroughCache(long length) {
219 return length <= maxStreamThroughCache;
220 }
221
222
223
224
225
226
227 public long[] getCurrentSize() {
228 return getStatVals(liveBytes);
229 }
230
231
232
233
234
235
236 public long getFillPercentage() {
237 return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
238 }
239
240
241
242
243
244
245
246 public long[] getHitCount() {
247 return getStatVals(statHit);
248 }
249
250
251
252
253
254
255
256
257 public long[] getMissCount() {
258 return getStatVals(statMiss);
259 }
260
261
262
263
264
265
266 public long[] getTotalRequestCount() {
267 AtomicLong[] hit = statHit.get();
268 AtomicLong[] miss = statMiss.get();
269 long[] cnt = new long[Math.max(hit.length, miss.length)];
270 for (int i = 0; i < hit.length; i++) {
271 cnt[i] += hit[i].get();
272 }
273 for (int i = 0; i < miss.length; i++) {
274 cnt[i] += miss[i].get();
275 }
276 return cnt;
277 }
278
279
280
281
282
283
284 public long[] getHitRatio() {
285 AtomicLong[] hit = statHit.get();
286 AtomicLong[] miss = statMiss.get();
287 long[] ratio = new long[Math.max(hit.length, miss.length)];
288 for (int i = 0; i < ratio.length; i++) {
289 if (i >= hit.length) {
290 ratio[i] = 0;
291 } else if (i >= miss.length) {
292 ratio[i] = 100;
293 } else {
294 long hitVal = hit[i].get();
295 long missVal = miss[i].get();
296 long total = hitVal + missVal;
297 ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
298 }
299 }
300 return ratio;
301 }
302
303
304
305
306
307
308
309
310 public long[] getEvictions() {
311 return getStatVals(statEvict);
312 }
313
314
315
316
317
318
319
320
321
322
323
324
325 public boolean hasBlock0(DfsStreamKey key) {
326 HashEntry e1 = table.get(slot(key, 0));
327 DfsBlock v = scan(e1, key, 0);
328 return v != null && v.contains(key, 0);
329 }
330
331 private int hash(int packHash, long off) {
332 return packHash + (int) (off >>> blockSizeShift);
333 }
334
335 int getBlockSize() {
336 return blockSize;
337 }
338
339 private static int tableSize(DfsBlockCacheConfig cfg) {
340 final int wsz = cfg.getBlockSize();
341 final long limit = cfg.getBlockLimit();
342 if (wsz <= 0) {
343 throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
344 }
345 if (limit < wsz) {
346 throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
347 }
348 return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
349 }
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366 DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
367 ReadableChannelSupplier fileChannel) throws IOException {
368 final long requestedPosition = position;
369 position = file.alignToBlock(position);
370
371 DfsStreamKey key = file.key;
372 int slot = slot(key, position);
373 HashEntry e1 = table.get(slot);
374 DfsBlock v = scan(e1, key, position);
375 if (v != null && v.contains(key, requestedPosition)) {
376 ctx.stats.blockCacheHit++;
377 getStat(statHit, key).incrementAndGet();
378 return v;
379 }
380
381 reserveSpace(blockSize, key);
382 ReentrantLock regionLock = lockFor(key, position);
383 regionLock.lock();
384 try {
385 HashEntry e2 = table.get(slot);
386 if (e2 != e1) {
387 v = scan(e2, key, position);
388 if (v != null) {
389 ctx.stats.blockCacheHit++;
390 getStat(statHit, key).incrementAndGet();
391 creditSpace(blockSize, key);
392 return v;
393 }
394 }
395
396 getStat(statMiss, key).incrementAndGet();
397 boolean credit = true;
398 try {
399 v = file.readOneBlock(position, ctx, fileChannel.get());
400 credit = false;
401 } finally {
402 if (credit) {
403 creditSpace(blockSize, key);
404 }
405 }
406 if (position != v.start) {
407
408 position = v.start;
409 slot = slot(key, position);
410 e2 = table.get(slot);
411 }
412
413 Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
414 ref.markHotter();
415 for (;;) {
416 HashEntry n = new HashEntry(clean(e2), ref);
417 if (table.compareAndSet(slot, e2, n)) {
418 break;
419 }
420 e2 = table.get(slot);
421 }
422 addToClock(ref, blockSize - v.size());
423 } finally {
424 regionLock.unlock();
425 }
426
427
428
429 if (v.contains(file.key, requestedPosition)) {
430 return v;
431 }
432 return getOrLoad(file, requestedPosition, ctx, fileChannel);
433 }
434
435 @SuppressWarnings("unchecked")
436 private void reserveSpace(long reserve, DfsStreamKey key) {
437 clockLock.lock();
438 try {
439 long live = LongStream.of(getCurrentSize()).sum() + reserve;
440 if (maxBytes < live) {
441 Ref prev = clockHand;
442 Ref hand = clockHand.next;
443 do {
444 if (hand.isHot()) {
445
446
447 hand.markColder();
448 prev = hand;
449 hand = hand.next;
450 continue;
451 } else if (prev == hand)
452 break;
453
454
455
456 Ref dead = hand;
457 hand = hand.next;
458 prev.next = hand;
459 dead.next = null;
460 dead.value = null;
461 live -= dead.size;
462 getStat(liveBytes, dead.key).addAndGet(-dead.size);
463 getStat(statEvict, dead.key).incrementAndGet();
464 } while (maxBytes < live);
465 clockHand = prev;
466 }
467 getStat(liveBytes, key).addAndGet(reserve);
468 } finally {
469 clockLock.unlock();
470 }
471 }
472
473 private void creditSpace(long credit, DfsStreamKey key) {
474 clockLock.lock();
475 try {
476 getStat(liveBytes, key).addAndGet(-credit);
477 } finally {
478 clockLock.unlock();
479 }
480 }
481
482 @SuppressWarnings("unchecked")
483 private void addToClock(Ref ref, long credit) {
484 clockLock.lock();
485 try {
486 if (credit != 0) {
487 getStat(liveBytes, ref.key).addAndGet(-credit);
488 }
489 Ref ptr = clockHand;
490 ref.next = ptr.next;
491 ptr.next = ref;
492 clockHand = ref;
493 } finally {
494 clockLock.unlock();
495 }
496 }
497
498 void put(DfsBlock v) {
499 put(v.stream, v.start, v.size(), v);
500 }
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515 <T> Ref<T> getOrLoadRef(
516 DfsStreamKey key, long position, RefLoader<T> loader)
517 throws IOException {
518 int slot = slot(key, position);
519 HashEntry e1 = table.get(slot);
520 Ref<T> ref = scanRef(e1, key, position);
521 if (ref != null) {
522 getStat(statHit, key).incrementAndGet();
523 return ref;
524 }
525
526 ReentrantLock regionLock = lockForRef(key);
527 long lockStart = System.currentTimeMillis();
528 regionLock.lock();
529 try {
530 HashEntry e2 = table.get(slot);
531 if (e2 != e1) {
532 ref = scanRef(e2, key, position);
533 if (ref != null) {
534 getStat(statHit, key).incrementAndGet();
535 return ref;
536 }
537 }
538
539 if (refLockWaitTime != null) {
540 refLockWaitTime.accept(
541 Long.valueOf(System.currentTimeMillis() - lockStart));
542 }
543 getStat(statMiss, key).incrementAndGet();
544 ref = loader.load();
545 ref.markHotter();
546
547 reserveSpace(ref.size, key);
548 for (;;) {
549 HashEntry n = new HashEntry(clean(e2), ref);
550 if (table.compareAndSet(slot, e2, n)) {
551 break;
552 }
553 e2 = table.get(slot);
554 }
555 addToClock(ref, 0);
556 } finally {
557 regionLock.unlock();
558 }
559 return ref;
560 }
561
562 <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
563 return put(key, 0, size, v);
564 }
565
566 <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
567 int slot = slot(key, pos);
568 HashEntry e1 = table.get(slot);
569 Ref<T> ref = scanRef(e1, key, pos);
570 if (ref != null) {
571 return ref;
572 }
573
574 reserveSpace(size, key);
575 ReentrantLock regionLock = lockFor(key, pos);
576 regionLock.lock();
577 try {
578 HashEntry e2 = table.get(slot);
579 if (e2 != e1) {
580 ref = scanRef(e2, key, pos);
581 if (ref != null) {
582 creditSpace(size, key);
583 return ref;
584 }
585 }
586
587 ref = new Ref<>(key, pos, size, v);
588 ref.markHotter();
589 for (;;) {
590 HashEntry n = new HashEntry(clean(e2), ref);
591 if (table.compareAndSet(slot, e2, n)) {
592 break;
593 }
594 e2 = table.get(slot);
595 }
596 addToClock(ref, 0);
597 } finally {
598 regionLock.unlock();
599 }
600 return ref;
601 }
602
603 boolean contains(DfsStreamKey key, long position) {
604 return scan(table.get(slot(key, position)), key, position) != null;
605 }
606
607 @SuppressWarnings("unchecked")
608 <T> T get(DfsStreamKey key, long position) {
609 T val = (T) scan(table.get(slot(key, position)), key, position);
610 if (val == null) {
611 getStat(statMiss, key).incrementAndGet();
612 } else {
613 getStat(statHit, key).incrementAndGet();
614 }
615 return val;
616 }
617
618 private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
619 Ref<T> r = scanRef(n, key, position);
620 return r != null ? r.get() : null;
621 }
622
623 @SuppressWarnings("unchecked")
624 private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
625 for (; n != null; n = n.next) {
626 Ref<T> r = n.ref;
627 if (r.position == position && r.key.equals(key)) {
628 return r.get() != null ? r : null;
629 }
630 }
631 return null;
632 }
633
634 private int slot(DfsStreamKey key, long position) {
635 return (hash(key.hash, position) >>> 1) % tableSize;
636 }
637
638 private ReentrantLock lockFor(DfsStreamKey key, long position) {
639 return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
640 }
641
642 private ReentrantLock lockForRef(DfsStreamKey key) {
643 int slot = (key.hash >>> 1) % refLocks[key.packExtPos].length;
644 return refLocks[key.packExtPos][slot];
645 }
646
647 private static AtomicLong[] newCounters() {
648 AtomicLong[] ret = new AtomicLong[PackExt.values().length];
649 for (int i = 0; i < ret.length; i++) {
650 ret[i] = new AtomicLong();
651 }
652 return ret;
653 }
654
655 private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
656 DfsStreamKey key) {
657 int pos = key.packExtPos;
658 while (true) {
659 AtomicLong[] vals = stats.get();
660 if (pos < vals.length) {
661 return vals[pos];
662 }
663 AtomicLong[] expect = vals;
664 vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
665 System.arraycopy(expect, 0, vals, 0, expect.length);
666 for (int i = expect.length; i < vals.length; i++) {
667 vals[i] = new AtomicLong();
668 }
669 if (stats.compareAndSet(expect, vals)) {
670 return vals[pos];
671 }
672 }
673 }
674
675 private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
676 AtomicLong[] stats = stat.get();
677 long[] cnt = new long[stats.length];
678 for (int i = 0; i < stats.length; i++) {
679 cnt[i] = stats[i].get();
680 }
681 return cnt;
682 }
683
684 private static HashEntry clean(HashEntry top) {
685 while (top != null && top.ref.next == null)
686 top = top.next;
687 if (top == null) {
688 return null;
689 }
690 HashEntry n = clean(top.next);
691 return n == top.next ? top : new HashEntry(n, top.ref);
692 }
693
694 private static final class HashEntry {
695
696 final HashEntry next;
697
698
699 final Ref ref;
700
701 HashEntry(HashEntry n, Ref r) {
702 next = n;
703 ref = r;
704 }
705 }
706
707 static final class Ref<T> {
708 final DfsStreamKey key;
709 final long position;
710 final long size;
711 volatile T value;
712 Ref next;
713
714 private volatile int hotCount;
715
716 Ref(DfsStreamKey key, long position, long size, T v) {
717 this.key = key;
718 this.position = position;
719 this.size = size;
720 this.value = v;
721 }
722
723 T get() {
724 T v = value;
725 if (v != null) {
726 markHotter();
727 }
728 return v;
729 }
730
731 boolean has() {
732 return value != null;
733 }
734
735 void markHotter() {
736 int cap = DfsBlockCache
737 .getInstance().cacheHotLimits[key.packExtPos];
738 hotCount = Math.min(cap, hotCount + 1);
739 }
740
741 void markColder() {
742 hotCount = Math.max(0, hotCount - 1);
743 }
744
745 boolean isHot() {
746 return hotCount > 0;
747 }
748 }
749
750 @FunctionalInterface
751 interface RefLoader<T> {
752 Ref<T> load() throws IOException;
753 }
754
755
756
757
758 @FunctionalInterface
759 interface ReadableChannelSupplier {
760
761
762
763
764 ReadableChannel get() throws IOException;
765 }
766 }