1
2
3
4
5
6
7
8
9
10
11
12 package org.eclipse.jgit.internal.storage.dfs;
13
14 import java.io.IOException;
15 import java.util.concurrent.atomic.AtomicLong;
16 import java.util.concurrent.atomic.AtomicReference;
17 import java.util.concurrent.atomic.AtomicReferenceArray;
18 import java.util.concurrent.locks.ReentrantLock;
19 import java.util.function.Consumer;
20 import java.util.stream.LongStream;
21
22 import org.eclipse.jgit.internal.JGitText;
23 import org.eclipse.jgit.internal.storage.pack.PackExt;
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 public final class DfsBlockCache {
61 private static volatile DfsBlockCache cache;
62
63 static {
64 reconfigure(new DfsBlockCacheConfig());
65 }
66
67
68
69
70
71
72
73
74
75
76
77
78
79 public static void reconfigure(DfsBlockCacheConfig cfg) {
80 cache = new DfsBlockCache(cfg);
81 }
82
83
84
85
86
87
88 public static DfsBlockCache getInstance() {
89 return cache;
90 }
91
92
93 private final int tableSize;
94
95
96 private final AtomicReferenceArray<HashEntry> table;
97
98
99
100
101
102
103 private final ReentrantLock[] loadLocks;
104
105
106
107
108 private final ReentrantLock[] refLocks;
109
110
111 private final long maxBytes;
112
113
114 private final long maxStreamThroughCache;
115
116
117
118
119
120
121
122
123
124
125
126 private final int blockSize;
127
128
129 private final int blockSizeShift;
130
131
132
133
134 private final AtomicReference<AtomicLong[]> statHit;
135
136
137
138
139
140 private final AtomicReference<AtomicLong[]> statMiss;
141
142
143
144
145
146 private final AtomicReference<AtomicLong[]> statEvict;
147
148
149
150
151 private final AtomicReference<AtomicLong[]> liveBytes;
152
153
154 private final ReentrantLock clockLock;
155
156
157
158
159 private final Consumer<Long> refLockWaitTime;
160
161
162 private Ref clockHand;
163
164 @SuppressWarnings("unchecked")
165 private DfsBlockCache(DfsBlockCacheConfig cfg) {
166 tableSize = tableSize(cfg);
167 if (tableSize < 1) {
168 throw new IllegalArgumentException(JGitText.get().tSizeMustBeGreaterOrEqual1);
169 }
170
171 table = new AtomicReferenceArray<>(tableSize);
172 loadLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
173 for (int i = 0; i < loadLocks.length; i++) {
174 loadLocks[i] = new ReentrantLock(true );
175 }
176 refLocks = new ReentrantLock[cfg.getConcurrencyLevel()];
177 for (int i = 0; i < refLocks.length; i++) {
178 refLocks[i] = new ReentrantLock(true );
179 }
180
181 maxBytes = cfg.getBlockLimit();
182 maxStreamThroughCache = (long) (maxBytes * cfg.getStreamRatio());
183 blockSize = cfg.getBlockSize();
184 blockSizeShift = Integer.numberOfTrailingZeros(blockSize);
185
186 clockLock = new ReentrantLock(true );
187 String none = "";
188 clockHand = new Ref<>(
189 DfsStreamKey.of(new DfsRepositoryDescription(none), none, null),
190 -1, 0, null);
191 clockHand.next = clockHand;
192
193 statHit = new AtomicReference<>(newCounters());
194 statMiss = new AtomicReference<>(newCounters());
195 statEvict = new AtomicReference<>(newCounters());
196 liveBytes = new AtomicReference<>(newCounters());
197
198 refLockWaitTime = cfg.getRefLockWaitTimeConsumer();
199 }
200
201 boolean shouldCopyThroughCache(long length) {
202 return length <= maxStreamThroughCache;
203 }
204
205
206
207
208
209
210 public long[] getCurrentSize() {
211 return getStatVals(liveBytes);
212 }
213
214
215
216
217
218
219 public long getFillPercentage() {
220 return LongStream.of(getCurrentSize()).sum() * 100 / maxBytes;
221 }
222
223
224
225
226
227
228
229 public long[] getHitCount() {
230 return getStatVals(statHit);
231 }
232
233
234
235
236
237
238
239
240 public long[] getMissCount() {
241 return getStatVals(statMiss);
242 }
243
244
245
246
247
248
249 public long[] getTotalRequestCount() {
250 AtomicLong[] hit = statHit.get();
251 AtomicLong[] miss = statMiss.get();
252 long[] cnt = new long[Math.max(hit.length, miss.length)];
253 for (int i = 0; i < hit.length; i++) {
254 cnt[i] += hit[i].get();
255 }
256 for (int i = 0; i < miss.length; i++) {
257 cnt[i] += miss[i].get();
258 }
259 return cnt;
260 }
261
262
263
264
265
266
267 public long[] getHitRatio() {
268 AtomicLong[] hit = statHit.get();
269 AtomicLong[] miss = statMiss.get();
270 long[] ratio = new long[Math.max(hit.length, miss.length)];
271 for (int i = 0; i < ratio.length; i++) {
272 if (i >= hit.length) {
273 ratio[i] = 0;
274 } else if (i >= miss.length) {
275 ratio[i] = 100;
276 } else {
277 long hitVal = hit[i].get();
278 long missVal = miss[i].get();
279 long total = hitVal + missVal;
280 ratio[i] = total == 0 ? 0 : hitVal * 100 / total;
281 }
282 }
283 return ratio;
284 }
285
286
287
288
289
290
291
292
293 public long[] getEvictions() {
294 return getStatVals(statEvict);
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308 public boolean hasBlock0(DfsStreamKey key) {
309 HashEntry e1 = table.get(slot(key, 0));
310 DfsBlock v = scan(e1, key, 0);
311 return v != null && v.contains(key, 0);
312 }
313
314 private int hash(int packHash, long off) {
315 return packHash + (int) (off >>> blockSizeShift);
316 }
317
318 int getBlockSize() {
319 return blockSize;
320 }
321
322 private static int tableSize(DfsBlockCacheConfig cfg) {
323 final int wsz = cfg.getBlockSize();
324 final long limit = cfg.getBlockLimit();
325 if (wsz <= 0) {
326 throw new IllegalArgumentException(JGitText.get().invalidWindowSize);
327 }
328 if (limit < wsz) {
329 throw new IllegalArgumentException(JGitText.get().windowSizeMustBeLesserThanLimit);
330 }
331 return (int) Math.min(5 * (limit / wsz) / 2, Integer.MAX_VALUE);
332 }
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349 DfsBlock getOrLoad(BlockBasedFile file, long position, DfsReader ctx,
350 ReadableChannelSupplier fileChannel) throws IOException {
351 final long requestedPosition = position;
352 position = file.alignToBlock(position);
353
354 DfsStreamKey key = file.key;
355 int slot = slot(key, position);
356 HashEntry e1 = table.get(slot);
357 DfsBlock v = scan(e1, key, position);
358 if (v != null && v.contains(key, requestedPosition)) {
359 ctx.stats.blockCacheHit++;
360 getStat(statHit, key).incrementAndGet();
361 return v;
362 }
363
364 reserveSpace(blockSize, key);
365 ReentrantLock regionLock = lockFor(key, position);
366 regionLock.lock();
367 try {
368 HashEntry e2 = table.get(slot);
369 if (e2 != e1) {
370 v = scan(e2, key, position);
371 if (v != null) {
372 ctx.stats.blockCacheHit++;
373 getStat(statHit, key).incrementAndGet();
374 creditSpace(blockSize, key);
375 return v;
376 }
377 }
378
379 getStat(statMiss, key).incrementAndGet();
380 boolean credit = true;
381 try {
382 v = file.readOneBlock(position, ctx, fileChannel.get());
383 credit = false;
384 } finally {
385 if (credit) {
386 creditSpace(blockSize, key);
387 }
388 }
389 if (position != v.start) {
390
391 position = v.start;
392 slot = slot(key, position);
393 e2 = table.get(slot);
394 }
395
396 Ref<DfsBlock> ref = new Ref<>(key, position, v.size(), v);
397 ref.hot = true;
398 for (;;) {
399 HashEntry n = new HashEntry(clean(e2), ref);
400 if (table.compareAndSet(slot, e2, n)) {
401 break;
402 }
403 e2 = table.get(slot);
404 }
405 addToClock(ref, blockSize - v.size());
406 } finally {
407 regionLock.unlock();
408 }
409
410
411
412 if (v.contains(file.key, requestedPosition)) {
413 return v;
414 }
415 return getOrLoad(file, requestedPosition, ctx, fileChannel);
416 }
417
418 @SuppressWarnings("unchecked")
419 private void reserveSpace(long reserve, DfsStreamKey key) {
420 clockLock.lock();
421 try {
422 long live = LongStream.of(getCurrentSize()).sum() + reserve;
423 if (maxBytes < live) {
424 Ref prev = clockHand;
425 Ref hand = clockHand.next;
426 do {
427 if (hand.hot) {
428
429
430 hand.hot = false;
431 prev = hand;
432 hand = hand.next;
433 continue;
434 } else if (prev == hand)
435 break;
436
437
438
439 Ref dead = hand;
440 hand = hand.next;
441 prev.next = hand;
442 dead.next = null;
443 dead.value = null;
444 live -= dead.size;
445 getStat(liveBytes, dead.key).addAndGet(-dead.size);
446 getStat(statEvict, dead.key).incrementAndGet();
447 } while (maxBytes < live);
448 clockHand = prev;
449 }
450 getStat(liveBytes, key).addAndGet(reserve);
451 } finally {
452 clockLock.unlock();
453 }
454 }
455
456 private void creditSpace(long credit, DfsStreamKey key) {
457 clockLock.lock();
458 try {
459 getStat(liveBytes, key).addAndGet(-credit);
460 } finally {
461 clockLock.unlock();
462 }
463 }
464
465 @SuppressWarnings("unchecked")
466 private void addToClock(Ref ref, long credit) {
467 clockLock.lock();
468 try {
469 if (credit != 0) {
470 getStat(liveBytes, ref.key).addAndGet(-credit);
471 }
472 Ref ptr = clockHand;
473 ref.next = ptr.next;
474 ptr.next = ref;
475 clockHand = ref;
476 } finally {
477 clockLock.unlock();
478 }
479 }
480
481 void put(DfsBlock v) {
482 put(v.stream, v.start, v.size(), v);
483 }
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498 <T> Ref<T> getOrLoadRef(
499 DfsStreamKey key, long position, RefLoader<T> loader)
500 throws IOException {
501 int slot = slot(key, position);
502 HashEntry e1 = table.get(slot);
503 Ref<T> ref = scanRef(e1, key, position);
504 if (ref != null) {
505 getStat(statHit, key).incrementAndGet();
506 return ref;
507 }
508
509 ReentrantLock regionLock = lockForRef(key);
510 long lockStart = System.currentTimeMillis();
511 regionLock.lock();
512 try {
513 HashEntry e2 = table.get(slot);
514 if (e2 != e1) {
515 ref = scanRef(e2, key, position);
516 if (ref != null) {
517 getStat(statHit, key).incrementAndGet();
518 return ref;
519 }
520 }
521
522 if (refLockWaitTime != null) {
523 refLockWaitTime.accept(
524 Long.valueOf(System.currentTimeMillis() - lockStart));
525 }
526 getStat(statMiss, key).incrementAndGet();
527 ref = loader.load();
528 ref.hot = true;
529
530 reserveSpace(ref.size, key);
531 for (;;) {
532 HashEntry n = new HashEntry(clean(e2), ref);
533 if (table.compareAndSet(slot, e2, n)) {
534 break;
535 }
536 e2 = table.get(slot);
537 }
538 addToClock(ref, 0);
539 } finally {
540 regionLock.unlock();
541 }
542 return ref;
543 }
544
545 <T> Ref<T> putRef(DfsStreamKey key, long size, T v) {
546 return put(key, 0, size, v);
547 }
548
549 <T> Ref<T> put(DfsStreamKey key, long pos, long size, T v) {
550 int slot = slot(key, pos);
551 HashEntry e1 = table.get(slot);
552 Ref<T> ref = scanRef(e1, key, pos);
553 if (ref != null) {
554 return ref;
555 }
556
557 reserveSpace(size, key);
558 ReentrantLock regionLock = lockFor(key, pos);
559 regionLock.lock();
560 try {
561 HashEntry e2 = table.get(slot);
562 if (e2 != e1) {
563 ref = scanRef(e2, key, pos);
564 if (ref != null) {
565 creditSpace(size, key);
566 return ref;
567 }
568 }
569
570 ref = new Ref<>(key, pos, size, v);
571 ref.hot = true;
572 for (;;) {
573 HashEntry n = new HashEntry(clean(e2), ref);
574 if (table.compareAndSet(slot, e2, n)) {
575 break;
576 }
577 e2 = table.get(slot);
578 }
579 addToClock(ref, 0);
580 } finally {
581 regionLock.unlock();
582 }
583 return ref;
584 }
585
586 boolean contains(DfsStreamKey key, long position) {
587 return scan(table.get(slot(key, position)), key, position) != null;
588 }
589
590 @SuppressWarnings("unchecked")
591 <T> T get(DfsStreamKey key, long position) {
592 T val = (T) scan(table.get(slot(key, position)), key, position);
593 if (val == null) {
594 getStat(statMiss, key).incrementAndGet();
595 } else {
596 getStat(statHit, key).incrementAndGet();
597 }
598 return val;
599 }
600
601 private <T> T scan(HashEntry n, DfsStreamKey key, long position) {
602 Ref<T> r = scanRef(n, key, position);
603 return r != null ? r.get() : null;
604 }
605
606 @SuppressWarnings("unchecked")
607 private <T> Ref<T> scanRef(HashEntry n, DfsStreamKey key, long position) {
608 for (; n != null; n = n.next) {
609 Ref<T> r = n.ref;
610 if (r.position == position && r.key.equals(key)) {
611 return r.get() != null ? r : null;
612 }
613 }
614 return null;
615 }
616
617 private int slot(DfsStreamKey key, long position) {
618 return (hash(key.hash, position) >>> 1) % tableSize;
619 }
620
621 private ReentrantLock lockFor(DfsStreamKey key, long position) {
622 return loadLocks[(hash(key.hash, position) >>> 1) % loadLocks.length];
623 }
624
625 private ReentrantLock lockForRef(DfsStreamKey key) {
626 return refLocks[(key.hash >>> 1) % refLocks.length];
627 }
628
629 private static AtomicLong[] newCounters() {
630 AtomicLong[] ret = new AtomicLong[PackExt.values().length];
631 for (int i = 0; i < ret.length; i++) {
632 ret[i] = new AtomicLong();
633 }
634 return ret;
635 }
636
637 private static AtomicLong getStat(AtomicReference<AtomicLong[]> stats,
638 DfsStreamKey key) {
639 int pos = key.packExtPos;
640 while (true) {
641 AtomicLong[] vals = stats.get();
642 if (pos < vals.length) {
643 return vals[pos];
644 }
645 AtomicLong[] expect = vals;
646 vals = new AtomicLong[Math.max(pos + 1, PackExt.values().length)];
647 System.arraycopy(expect, 0, vals, 0, expect.length);
648 for (int i = expect.length; i < vals.length; i++) {
649 vals[i] = new AtomicLong();
650 }
651 if (stats.compareAndSet(expect, vals)) {
652 return vals[pos];
653 }
654 }
655 }
656
657 private static long[] getStatVals(AtomicReference<AtomicLong[]> stat) {
658 AtomicLong[] stats = stat.get();
659 long[] cnt = new long[stats.length];
660 for (int i = 0; i < stats.length; i++) {
661 cnt[i] = stats[i].get();
662 }
663 return cnt;
664 }
665
666 private static HashEntry clean(HashEntry top) {
667 while (top != null && top.ref.next == null)
668 top = top.next;
669 if (top == null) {
670 return null;
671 }
672 HashEntry n = clean(top.next);
673 return n == top.next ? top : new HashEntry(n, top.ref);
674 }
675
676 private static final class HashEntry {
677
678 final HashEntry next;
679
680
681 final Ref ref;
682
683 HashEntry(HashEntry n, Ref r) {
684 next = n;
685 ref = r;
686 }
687 }
688
689 static final class Ref<T> {
690 final DfsStreamKey key;
691 final long position;
692 final long size;
693 volatile T value;
694 Ref next;
695 volatile boolean hot;
696
697 Ref(DfsStreamKey key, long position, long size, T v) {
698 this.key = key;
699 this.position = position;
700 this.size = size;
701 this.value = v;
702 }
703
704 T get() {
705 T v = value;
706 if (v != null) {
707 hot = true;
708 }
709 return v;
710 }
711
712 boolean has() {
713 return value != null;
714 }
715 }
716
717 @FunctionalInterface
718 interface RefLoader<T> {
719 Ref<T> load() throws IOException;
720 }
721
722
723
724
725 @FunctionalInterface
726 interface ReadableChannelSupplier {
727
728
729
730
731 ReadableChannel get() throws IOException;
732 }
733 }