1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 package org.eclipse.jgit.internal.storage.dfs;
46
47 import java.io.IOException;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.atomic.AtomicReference;
50 import java.util.concurrent.atomic.AtomicReferenceArray;
51 import java.util.concurrent.locks.ReentrantLock;
52 import java.util.function.Consumer;
53 import java.util.stream.LongStream;
54
55 import org.eclipse.jgit.internal.JGitText;
56 import org.eclipse.jgit.internal.storage.pack.PackExt;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public final class DfsBlockCache {
94 private static volatile DfsBlockCache cache;
95
96 static {
97 reconfigure(new DfsBlockCacheConfig());
98 }
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public static void reconfigure(DfsBlockCacheConfig cfg) {
113 cache = new DfsBlockCache(cfg);
114 }
115
116
117
118
119
120
121 public static DfsBlockCache getInstance() {
122 return cache;
123 }
124
125
126 private final int tableSize;
127
128
129 private final AtomicReferenceArray<HashEntry> table;
130
131
132
133
134
135
136 private final ReentrantLock[] loadLocks;
137
138
139
140
141 private final ReentrantLock[] refLocks;
142
143
144 private final long maxBytes;
145
146
147 private final long maxStreamThroughCache;
148
149
150
151
152
153
154
155
156
157
158
159 private final int blockSize;
160
161
162 private final int blockSizeShift;
163
164
165
166
167 private final AtomicReference<AtomicLong[]> statHit;
168
169
170
171
172
173 private final AtomicReference<AtomicLong[]> statMiss;
174
175
176
177
178
179 private final AtomicReference<AtomicLong[]> statEvict;
180
181
182
183
184 private final AtomicReference<AtomicLong[]> liveBytes;
185
186
187 private final ReentrantLock clockLock;
188
189
190
191
192 private final Consumer<Long> refLockWaitTime;
193
194
195 private Ref clockHand;
196
197 @SuppressWarnings("unchecked")
198 private DfsBlockCache(DfsBlockCacheConfig cfg) {
199 tableSize = tableSize(cfg);
200 if (tableSize < 1) {
201 throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
202 }
203
204 table = new AtomicReferenceArray<>(tableSize);
205 loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
206 for (int i = 0; i < loadLocks.length; i++) {
207 loadLocks[i] = new ReentrantLock(true );
208 }
209 refLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
210 for (int i = 0; i < refLocks.length; i++) {
211 refLocks[i] = new ReentrantLock(true );
212 }
213
214 maxBytes = cfg.getBlockLimit();
215 maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
216 blockSize = cfg.getBlockSize();
217 blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
218
219 clockLock = new ReentrantLock(true );
220 String none = "";
221 clockHand = new Ref<>(
222 DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
223 -1, 0, null);
224 clockHand.next = clockHand;
225
226 statHit = new AtomicReference<>(newCounters());
227 statMiss = new AtomicReference<>(newCounters());
228 statEvict = new AtomicReference<>(newCounters());
229 liveBytes = new AtomicReference<>(newCounters());
230
231 refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
232 }
233
234 boolean shouldCopyThroughCache(long length) {
235 return length <= maxStreamThroughCache;
236 }
237
238
239
240
241
242
243 public long[] getCurrentSize() {
244 return getStatVals(liveBytes);
245 }
246
247
248
249
250
251
252 public long getFillPercentage() {
253 return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
254 }
255
256
257
258
259
260
261
262 public long[] getHitCount() {
263 return getStatVals(statHit);
264 }
265
266
267
268
269
270
271
272
273 public long[] getMissCount() {
274 return getStatVals(statMiss);
275 }
276
277
278
279
280
281
282 public long[] getTotalRequestCount() {
283 AtomicLong[] hit = statHit.get();
284 AtomicLong[] miss = statMiss.get();
285 long[] cnt = new long[Math.max(hit.length, miss.length)];
286 for (int i = 0; i < hit.length; i++) {
287 cnt[i] += hit[i].get();
288 }
289 for (int i = 0; i < miss.length; i++) {
290 cnt[i] += miss[i].get();
291 }
292 return cnt;
293 }
294
295
296
297
298
299
300 public long[] getHitRatio() {
301 AtomicLong[] hit = statHit.get();
302 AtomicLong[] miss = statMiss.get();
303 long[] ratio = new long[Math.max(hit.length, miss.length)];
304 for (int i = 0; i < ratio.length; i++) {
305 if (i >= hit.length) {
306 ratio[i] = 0;
307 } else if (i >= miss.length) {
308 ratio[i] = 100;
309 } else {
310 long hitVal = hit[i].get();
311 long missVal = miss[i].get();
312 long total = hitVal + missVal;
313 ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
314 }
315 }
316 return ratio;
317 }
318
319
320
321
322
323
324
325
326 public long[] getEvictions() {
327 return getStatVals(statEvict);
328 }
329
330
331
332
333
334
335
336
337
338
339
340
341 public boolean hasBlock0(DfsStreamKey key) {
342 HashEntry e1 = table.get(slot(key, 0));
343 DfsBlock v = scan(e1, key, 0);
344 return v != null && v.contains(key, 0);
345 }
346
347 private int hash(int packHash, long off) {
348 return packHash + (int) (off >>> blockSizeShift);
349 }
350
351 int getBlockSize() {
352 return blockSize;
353 }
354
355 private static int tableSize(DfsBlockCacheConfig cfg) {
356 final int wsz = cfg.getBlockSize();
357 final long limit = cfg.getBlockLimit();
358 if (wsz <= 0) {
359 throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
360 }
361 if (limit < wsz) {
362 throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
363 }
364 return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
365 }
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382 DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
383 ReadableChannelSupplier fileChannel) throws IOException {
384 final long requestedPosition = position;
385 position = file.alignToBlock(position);
386
387 DfsStreamKey key = file.key;
388 int slot = slot(key, position);
389 HashEntry e1 = table.get(slot);
390 DfsBlock v = scan(e1, key, position);
391 if (v != null && v.contains(key, requestedPosition)) {
392 ctx.stats.blockCacheHit++;
393 getStat(statHit, key).incrementAndGet();
394 return v;
395 }
396
397 reserveSpace(blockSize, key);
398 ReentrantLock regionLock = lockFor(key, position);
399 regionLock.lock();
400 try {
401 HashEntry e2 = table.get(slot);
402 if (e2 != e1) {
403 v = scan(e2, key, position);
404 if (v != null) {
405 ctx.stats.blockCacheHit++;
406 getStat(statHit, key).incrementAndGet();
407 creditSpace(blockSize, key);
408 return v;
409 }
410 }
411
412 getStat(statMiss, key).incrementAndGet();
413 boolean credit = true;
414 try {
415 v = file.readOneBlock(requestedPosition, ctx,
416 fileChannel.get());
417 credit = false;
418 } finally {
419 if (credit) {
420 creditSpace(blockSize, key);
421 }
422 }
423 if (position != v.start) {
424
425 position = v.start;
426 slot = slot(key, position);
427 e2 = table.get(slot);
428 }
429
430 Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
431 ref.hot = true;
432 for (;;) {
433 HashEntry n = new HashEntry(clean(e2), ref);
434 if (table.compareAndSet(slot, e2, n)) {
435 break;
436 }
437 e2 = table.get(slot);
438 }
439 addToClock(ref, blockSize - v.size());
440 } finally {
441 regionLock.unlock();
442 }
443
444
445
446 if (v.contains(file.key, requestedPosition)) {
447 return v;
448 }
449 return getOrLoad(file, requestedPosition, ctx, fileChannel);
450 }
451
452 @SuppressWarnings("unchecked")
453 private void reserveSpace(int reserve, DfsStreamKey key) {
454 clockLock.lock();
455 try {
456 long live = LongStream.of(getCurrentSize()).sum() + reserve;
457 if (maxBytes < live) {
458 Ref prev = clockHand;
459 Ref hand = clockHand.next;
460 do {
461 if (hand.hot) {
462
463
464 hand.hot = false;
465 prev = hand;
466 hand = hand.next;
467 continue;
468 } else if (prev == hand)
469 break;
470
471
472
473 Ref dead = hand;
474 hand = hand.next;
475 prev.next = hand;
476 dead.next = null;
477 dead.value = null;
478 live -= dead.size;
479 getStat(liveBytes, dead.key).addAndGet(-dead.size);
480 getStat(statEvict, dead.key).incrementAndGet();
481 } while (maxBytes < live);
482 clockHand = prev;
483 }
484 getStat(liveBytes, key).addAndGet(reserve);
485 } finally {
486 clockLock.unlock();
487 }
488 }
489
490 private void creditSpace(int credit, DfsStreamKey key) {
491 clockLock.lock();
492 try {
493 getStat(liveBytes, key).addAndGet(-credit);
494 } finally {
495 clockLock.unlock();
496 }
497 }
498
499 @SuppressWarnings("unchecked")
500 private void addToClock(Ref ref, int credit) {
501 clockLock.lock();
502 try {
503 if (credit != 0) {
504 getStat(liveBytes, ref.key).addAndGet(-credit);
505 }
506 Ref ptr = clockHand;
507 ref.next = ptr.next;
508 ptr.next = ref;
509 clockHand = ref;
510 } finally {
511 clockLock.unlock();
512 }
513 }
514
515 void put(DfsBlock v) {
516 put(v.stream, v.start, v.size(), v);
517 }
518
519
520
521
522
523
524
525
526
527
528
529
530 <T> Ref<T> getOrLoadRef(DfsStreamKey key, RefLoader<T> loader)
531 throws IOException {
532 int slot = slot(key, 0);
533 HashEntry e1 = table.get(slot);
534 Ref<T> ref = scanRef(e1, key, 0);
535 if (ref != null) {
536 getStat(statHit, key).incrementAndGet();
537 return ref;
538 }
539
540 ReentrantLock regionLock = lockForRef(key);
541 long lockStart = System.currentTimeMillis();
542 regionLock.lock();
543 try {
544 HashEntry e2 = table.get(slot);
545 if (e2 != e1) {
546 ref = scanRef(e2, key, 0);
547 if (ref != null) {
548 getStat(statHit, key).incrementAndGet();
549 return ref;
550 }
551 }
552
553 if (refLockWaitTime != null) {
554 refLockWaitTime.accept(
555 Long.valueOf(System.currentTimeMillis() - lockStart));
556 }
557 getStat(statMiss, key).incrementAndGet();
558 ref = loader.load();
559 ref.hot = true;
560
561 reserveSpace(ref.size, key);
562 for (;;) {
563 HashEntry n = new HashEntry(clean(e2), ref);
564 if (table.compareAndSet(slot, e2, n)) {
565 break;
566 }
567 e2 = table.get(slot);
568 }
569 addToClock(ref, 0);
570 } finally {
571 regionLock.unlock();
572 }
573 return ref;
574 }
575
576 <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
577 return put(key, 0, (int) Math.min(size, Integer.MAX_VALUE), v);
578 }
579
580 <T> Ref<T> put(DfsStreamKey key, long pos, int size, T v) {
581 int slot = slot(key, pos);
582 HashEntry e1 = table.get(slot);
583 Ref<T> ref = scanRef(e1, key, pos);
584 if (ref != null) {
585 return ref;
586 }
587
588 reserveSpace(size, key);
589 ReentrantLock regionLock = lockFor(key, pos);
590 regionLock.lock();
591 try {
592 HashEntry e2 = table.get(slot);
593 if (e2 != e1) {
594 ref = scanRef(e2, key, pos);
595 if (ref != null) {
596 creditSpace(size, key);
597 return ref;
598 }
599 }
600
601 ref = new Ref<>(key, pos, size, v);
602 ref.hot = true;
603 for (;;) {
604 HashEntry n = new HashEntry(clean(e2), ref);
605 if (table.compareAndSet(slot, e2, n)) {
606 break;
607 }
608 e2 = table.get(slot);
609 }
610 addToClock(ref, 0);
611 } finally {
612 regionLock.unlock();
613 }
614 return ref;
615 }
616
617 boolean contains(DfsStreamKey key, long position) {
618 return scan(table.get(slot(key, position)), key, position) != null;
619 }
620
621 @SuppressWarnings("unchecked")
622 <T> T get(DfsStreamKey key, long position) {
623 T val = (T) scan(table.get(slot(key, position)), key, position);
624 if (val == null) {
625 getStat(statMiss, key).incrementAndGet();
626 } else {
627 getStat(statHit, key).incrementAndGet();
628 }
629 return val;
630 }
631
632 private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
633 Ref<T> r = scanRef(n, key, position);
634 return r != null ? r.get() : null;
635 }
636
637 @SuppressWarnings("unchecked")
638 private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
639 for (; n != null; n = n.next) {
640 Ref<T> r = n.ref;
641 if (r.position == position && r.key.equals(key)) {
642 return r.get() != null ? r : null;
643 }
644 }
645 return null;
646 }
647
648 private int slot(DfsStreamKey key, long position) {
649 return (hash(key.hash, position) >>> 1) % tableSize;
650 }
651
652 private ReentrantLock lockFor(DfsStreamKey key, long position) {
653 return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
654 }
655
656 private ReentrantLock lockForRef(DfsStreamKey key) {
657 return refLocks[(key.hash >>> 1) % refLocks.length];
658 }
659
660 private static AtomicLong[] newCounters() {
661 AtomicLong[] ret = new AtomicLong[PackExt.values().length];
662 for (int i = 0; i < ret.length; i++) {
663 ret[i] = new AtomicLong();
664 }
665 return ret;
666 }
667
668 private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
669 DfsStreamKey key) {
670 int pos = key.packExtPos;
671 while (true) {
672 AtomicLong[] vals = stats.get();
673 if (pos < vals.length) {
674 return vals[pos];
675 }
676 AtomicLong[] expect = vals;
677 vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
678 System.arraycopy(expect, 0, vals, 0, expect.length);
679 for (int i = expect.length; i < vals.length; i++) {
680 vals[i] = new AtomicLong();
681 }
682 if (stats.compareAndSet(expect, vals)) {
683 return vals[pos];
684 }
685 }
686 }
687
688 private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
689 AtomicLong[] stats = stat.get();
690 long[] cnt = new long[stats.length];
691 for (int i = 0; i < stats.length; i++) {
692 cnt[i] = stats[i].get();
693 }
694 return cnt;
695 }
696
697 private static HashEntry clean(HashEntry top) {
698 while (top != null && top.ref.next == null)
699 top = top.next;
700 if (top == null) {
701 return null;
702 }
703 HashEntry n = clean(top.next);
704 return n == top.next ? top : new HashEntry(n, top.ref);
705 }
706
707 private static final class HashEntry {
708
709 final HashEntry next;
710
711
712 final Ref ref;
713
714 HashEntry(HashEntry n, Ref r) {
715 next = n;
716 ref = r;
717 }
718 }
719
720 static final class Ref<T> {
721 final DfsStreamKey key;
722 final long position;
723 final int size;
724 volatile T value;
725 Ref next;
726 volatile boolean hot;
727
728 Ref(DfsStreamKey key, long position, int size, T v) {
729 this.key = key;
730 this.position = position;
731 this.size = size;
732 this.value = v;
733 }
734
735 T get() {
736 T v = value;
737 if (v != null) {
738 hot = true;
739 }
740 return v;
741 }
742
743 boolean has() {
744 return value != null;
745 }
746 }
747
748 @FunctionalInterface
749 interface RefLoader<T> {
750 Ref<T> load() throws IOException;
751 }
752
753
754
755
756 @FunctionalInterface
757 interface ReadableChannelSupplier {
758
759
760
761
762 ReadableChannel get() throws IOException;
763 }
764 }