1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.ketch;
12
13 import static java.util.concurrent.TimeUnit.MILLISECONDS;
14 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
15 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
16 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
17 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
18 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
19 import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
20 import static org.eclipse.jgit.lib.Constants.HEAD;
21 import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
22 import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
23 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
24 import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;
25
26 import java.io.IOException;
27 import java.lang.ref.WeakReference;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.HashMap;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.Future;
36
37 import org.eclipse.jgit.annotations.NonNull;
38 import org.eclipse.jgit.annotations.Nullable;
39 import org.eclipse.jgit.internal.storage.reftree.RefTree;
40 import org.eclipse.jgit.lib.AnyObjectId;
41 import org.eclipse.jgit.lib.ObjectId;
42 import org.eclipse.jgit.lib.Ref;
43 import org.eclipse.jgit.lib.Repository;
44 import org.eclipse.jgit.revwalk.RevWalk;
45 import org.eclipse.jgit.transport.ReceiveCommand;
46 import org.eclipse.jgit.treewalk.TreeWalk;
47 import org.eclipse.jgit.util.FileUtils;
48 import org.eclipse.jgit.util.SystemReader;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 public abstract class KetchReplica {
81 static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
82 private static final byte[] PEEL = { ' ', '^' };
83
84
85 public enum Participation {
86
87 FULL,
88
89
90 FOLLOWER_ONLY;
91 }
92
93
94 public enum CommitMethod {
95
96 ALL_REFS,
97
98
99 TXN_COMMITTED;
100 }
101
102
103 public enum CommitSpeed {
104
105
106
107
108 FAST,
109
110
111
112
113
114
115 BATCHED;
116 }
117
118
119 public enum State {
120
121 UNKNOWN,
122
123
124 LAGGING,
125
126
127 CURRENT,
128
129
130 DIVERGENT,
131
132
133 AHEAD,
134
135
136 OFFLINE;
137 }
138
139 private final KetchLeader leader;
140 private final String replicaName;
141 private final Participation participation;
142 private final CommitMethod commitMethod;
143 private final CommitSpeed commitSpeed;
144 private final long minRetryMillis;
145 private final long maxRetryMillis;
146 private final Map<ObjectId, List<ReceiveCommand>> staged;
147 private final Map<String, ReceiveCommand> running;
148 private final Map<String, ReceiveCommand> waiting;
149 private final List<ReplicaPushRequest> queued;
150
151
152
153
154
155
156 private ObjectId txnAccepted;
157
158
159
160
161
162
163
164
165 private ObjectId txnCommitted;
166
167
168 private State state = UNKNOWN;
169 private String error;
170
171
172 private Future<?> retryFuture;
173 private long lastRetryMillis;
174 private long retryAtMillis;
175
176
177
178
179
180
181
182
183
184
185
186 protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
187 this.leader = leader;
188 this.replicaName = name;
189 this.participation = cfg.getParticipation();
190 this.commitMethod = cfg.getCommitMethod();
191 this.commitSpeed = cfg.getCommitSpeed();
192 this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
193 this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
194 this.staged = new HashMap<>();
195 this.running = new HashMap<>();
196 this.waiting = new HashMap<>();
197 this.queued = new ArrayList<>(4);
198 }
199
200
201
202
203
204
205 public KetchSystem getSystem() {
206 return getLeader().getSystem();
207 }
208
209
210
211
212
213
214 public KetchLeader getLeader() {
215 return leader;
216 }
217
218
219
220
221
222
223 public String getName() {
224 return replicaName;
225 }
226
227
228
229
230
231
232 protected String describeForLog() {
233 return getName();
234 }
235
236
237
238
239
240
241 public Participation getParticipation() {
242 return participation;
243 }
244
245
246
247
248
249
250 public CommitMethod getCommitMethod() {
251 return commitMethod;
252 }
253
254
255
256
257
258
259 public CommitSpeed getCommitSpeed() {
260 return commitSpeed;
261 }
262
263
264
265
266
267
268
269
270
271
272 protected void shutdown() {
273 Future<?> f = retryFuture;
274 if (f != null) {
275 retryFuture = null;
276 f.cancel(true);
277 }
278 }
279
280 ReplicaSnapshot snapshot() {
281 ReplicaSnapshot s = new ReplicaSnapshot(this);
282 s.accepted = txnAccepted;
283 s.committed = txnCommitted;
284 s.state = state;
285 s.error = error;
286 s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
287 return s;
288 }
289
290
291
292
293
294
295
296
297
298 void initialize(Map<String, Ref> refs) {
299 if (txnAccepted == null) {
300 txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
301 }
302 if (txnCommitted == null) {
303 txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
304 }
305 }
306
307 ObjectId getTxnAccepted() {
308 return txnAccepted;
309 }
310
311 boolean hasAccepted(LogIndex id) {
312 return equals(txnAccepted, id);
313 }
314
315 private static boolean equals(@Nullable ObjectId a, LogIndex b) {
316 return a != null && b != null && AnyObjectId.isEqual(a, b);
317 }
318
319
320
321
322
323
324
325
326
327 void pushTxnAcceptedAsync(Round round) {
328 List<ReceiveCommand> cmds = new ArrayList<>();
329 if (commitSpeed == BATCHED) {
330 LogIndex committedIndex = leader.getCommitted();
331 if (equals(txnAccepted, committedIndex)
332 && !equals(txnCommitted, committedIndex)) {
333 prepareTxnCommitted(cmds, committedIndex);
334 }
335 }
336
337
338 if (round.stageCommands != null) {
339 for (ReceiveCommand cmd : round.stageCommands) {
340
341 cmds.add(copy(cmd));
342 }
343 }
344 cmds.add(new ReceiveCommand(
345 round.acceptedOldIndex, round.acceptedNewIndex,
346 getSystem().getTxnAccepted()));
347 pushAsync(new ReplicaPushRequest(this, cmds));
348 }
349
350 private static ReceiveCommand../../../../org/eclipse/jgit/transport/ReceiveCommand.html#ReceiveCommand">ReceiveCommand copy(ReceiveCommand c) {
351 return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
352 }
353
354 boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
355 return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
356 }
357
358 void pushCommitAsync(LogIndex committed) {
359 List<ReceiveCommand> cmds = new ArrayList<>();
360 prepareTxnCommitted(cmds, committed);
361 pushAsync(new ReplicaPushRequest(this, cmds));
362 }
363
364 private void prepareTxnCommitted(List<ReceiveCommand> cmds,
365 ObjectId committed) {
366 removeStaged(cmds, committed);
367 cmds.add(new ReceiveCommand(
368 txnCommitted, committed,
369 getSystem().getTxnCommitted()));
370 }
371
372 private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
373 List<ReceiveCommand> a = staged.remove(committed);
374 if (a != null) {
375 delete(cmds, a);
376 }
377 if (staged.isEmpty() || !(committed instanceof LogIndex)) {
378 return;
379 }
380
381 LogIndex committedIndex = (LogIndex) committed;
382 Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
383 .entrySet().iterator();
384 while (itr.hasNext()) {
385 Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
386 if (e.getKey() instanceof LogIndex) {
387 LogIndex stagedIndex = (LogIndex) e.getKey();
388 if (stagedIndex.isBefore(committedIndex)) {
389 delete(cmds, e.getValue());
390 itr.remove();
391 }
392 }
393 }
394 }
395
396 private static void delete(List<ReceiveCommand> cmds,
397 List<ReceiveCommand> createCmds) {
398 for (ReceiveCommand cmd : createCmds) {
399 ObjectId id = cmd.getNewId();
400 String name = cmd.getRefName();
401 cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
402 }
403 }
404
405
406
407
408
409
410
411
412
413
414
415
416
417 private void runNextPushRequest() {
418 LogIndex committed = leader.getCommitted();
419 if (!equals(txnCommitted, committed)
420 && shouldPushUnbatchedCommit(committed, leader.isIdle())) {
421 pushCommitAsync(committed);
422 }
423
424 if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
425 return;
426 }
427
428
429 Map<String, ReceiveCommand> cmdMap = new HashMap<>();
430 for (ReplicaPushRequest req : queued) {
431 for (ReceiveCommand cmd : req.getCommands()) {
432 String name = cmd.getRefName();
433 ReceiveCommand old = cmdMap.remove(name);
434 if (old != null) {
435 cmd = new ReceiveCommand(
436 old.getOldId(), cmd.getNewId(),
437 name);
438 }
439 cmdMap.put(name, cmd);
440 }
441 }
442 queued.clear();
443 waiting.clear();
444
445 List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
446 for (ReceiveCommand cmd : next) {
447 running.put(cmd.getRefName(), cmd);
448 }
449 startPush(new ReplicaPushRequest(this, next));
450 }
451
452 private void pushAsync(ReplicaPushRequest req) {
453 if (defer(req)) {
454
455 for (ReceiveCommand cmd : req.getCommands()) {
456 waiting.put(cmd.getRefName(), cmd);
457 }
458 queued.add(req);
459 } else {
460 for (ReceiveCommand cmd : req.getCommands()) {
461 running.put(cmd.getRefName(), cmd);
462 }
463 startPush(req);
464 }
465 }
466
467 private boolean defer(ReplicaPushRequest req) {
468 if (waitingForRetry()) {
469
470 return true;
471 }
472
473 for (ReceiveCommand nextCmd : req.getCommands()) {
474 ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
475 if (priorCmd == null) {
476 priorCmd = running.get(nextCmd.getRefName());
477 }
478 if (priorCmd != null) {
479
480
481 return true;
482 }
483 }
484 return false;
485 }
486
487 private boolean waitingForRetry() {
488 Future<?> f = retryFuture;
489 return f != null && !f.isDone();
490 }
491
492 private void retryLater(ReplicaPushRequest req) {
493 Collection<ReceiveCommand> cmds = req.getCommands();
494 for (ReceiveCommand cmd : cmds) {
495 cmd.setResult(NOT_ATTEMPTED, null);
496 if (!waiting.containsKey(cmd.getRefName())) {
497 waiting.put(cmd.getRefName(), cmd);
498 }
499 }
500 queued.add(0, new ReplicaPushRequest(this, cmds));
501
502 if (!waitingForRetry()) {
503 long delay = FileUtils
504 .delay(lastRetryMillis, minRetryMillis, maxRetryMillis);
505 if (log.isDebugEnabled()) {
506 log.debug("Retrying {} after {} ms",
507 describeForLog(), Long.valueOf(delay));
508 }
509 lastRetryMillis = delay;
510 retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
511 retryFuture = getSystem().getExecutor()
512 .schedule(new WeakRetryPush(this), delay, MILLISECONDS);
513 }
514 }
515
516
517 static class WeakRetryPush extends WeakReference<KetchReplica>
518 implements Callable<Void> {
519 WeakRetryPush(KetchReplica r) {
520 super(r);
521 }
522
523 @Override
524 public Void call() throws Exception {
525 KetchReplica r = get();
526 if (r != null) {
527 r.doRetryPush();
528 }
529 return null;
530 }
531 }
532
533 private void doRetryPush() {
534 leader.lock.lock();
535 try {
536 retryFuture = null;
537 runNextPushRequest();
538 } finally {
539 leader.lock.unlock();
540 }
541 }
542
543
544
545
546
547
548
549
550
551
552 protected abstract void startPush(ReplicaPushRequest req);
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568 void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
569 ReceiveCommand acceptCmd = null;
570 ReceiveCommand commitCmd = null;
571 List<ReceiveCommand> stages = null;
572
573 for (ReceiveCommand cmd : req.getCommands()) {
574 String name = cmd.getRefName();
575 if (name.equals(getSystem().getTxnAccepted())) {
576 acceptCmd = cmd;
577 } else if (name.equals(getSystem().getTxnCommitted())) {
578 commitCmd = cmd;
579 } else if (cmd.getResult() == OK && cmd.getType() == CREATE
580 && name.startsWith(getSystem().getTxnStage())) {
581 if (stages == null) {
582 stages = new ArrayList<>();
583 }
584 stages.add(cmd);
585 }
586 }
587
588 State newState = null;
589 ObjectId acceptId = readId(req, acceptCmd);
590 if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
591 && req.getException() == null) {
592 try (LagCheckl/ketch/LagCheck.html#LagCheck">LagCheck lag = new LagCheck(this, repo)) {
593 newState = lag.check(acceptId, acceptCmd);
594 acceptId = lag.getRemoteId();
595 }
596 }
597
598 leader.lock.lock();
599 try {
600 for (ReceiveCommand cmd : req.getCommands()) {
601 running.remove(cmd.getRefName());
602 }
603
604 Throwable err = req.getException();
605 if (err != null) {
606 state = OFFLINE;
607 error = err.toString();
608 retryLater(req);
609 leader.onReplicaUpdate(this);
610 return;
611 }
612
613 lastRetryMillis = 0;
614 error = null;
615 updateView(req, acceptId, commitCmd);
616
617 if (acceptCmd != null && acceptCmd.getResult() == OK) {
618 state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
619 if (stages != null) {
620 staged.put(acceptCmd.getNewId(), stages);
621 }
622 } else if (newState != null) {
623 state = newState;
624 }
625
626 leader.onReplicaUpdate(this);
627 runNextPushRequest();
628 } finally {
629 leader.lock.unlock();
630 }
631 }
632
633 private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
634 ReceiveCommand commitCmd) {
635 if (acceptId != null) {
636 txnAccepted = acceptId;
637 }
638
639 ObjectId committed = readId(req, commitCmd);
640 if (committed != null) {
641 txnCommitted = committed;
642 } else if (acceptId != null && txnCommitted == null) {
643
644 Map<String, Ref> adv = req.getRefs();
645 if (adv != null) {
646 Ref refs = adv.get(getSystem().getTxnCommitted());
647 txnCommitted = getId(refs);
648 }
649 }
650 }
651
652 @Nullable
653 private static ObjectId readId(ReplicaPushRequest req,
654 @Nullable ReceiveCommand cmd) {
655 if (cmd == null) {
656
657 return null;
658
659 } else if (cmd.getResult() == OK) {
660
661 return cmd.getNewId();
662 }
663
664 Map<String, Ref> refs = req.getRefs();
665 return refs != null ? getId(refs.get(cmd.getRefName())) : null;
666 }
667
668
669
670
671
672
673
674
675
676
677
678
679
680 protected abstract void blockingFetch(Repository repo,
681 ReplicaFetchRequest req) throws IOException;
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698 protected Collection<ReceiveCommand> prepareCommit(Repository git,
699 Map<String, Ref> current, ObjectId committed) throws IOException {
700 List<ReceiveCommand> delta = new ArrayList<>();
701 Map<String, Ref> remote = new HashMap<>(current);
702 try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git);
703 TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
704 tw.setRecursive(true);
705 tw.addTree(rw.parseCommit(committed).getTree());
706 while (tw.next()) {
707 if (tw.getRawMode(0) != TYPE_GITLINK
708 || tw.isPathSuffix(PEEL, 2)) {
709
710
711 continue;
712 }
713
714
715 String name = RefTree.refName(tw.getPathString());
716 Ref oldRef = remote.remove(name);
717 ObjectId oldId = getId(oldRef);
718 ObjectId newId = tw.getObjectId(0);
719 if (!AnyObjectId.isEqual(oldId, newId)) {
720 delta.add(new ReceiveCommand(oldId, newId, name));
721 }
722 }
723 }
724
725
726 for (Ref ref : remote.values()) {
727 if (canDelete(ref)) {
728 delta.add(new ReceiveCommand(
729 ref.getObjectId(), ObjectId.zeroId(),
730 ref.getName()));
731 }
732 }
733 return delta;
734 }
735
736 boolean canDelete(Ref ref) {
737 String name = ref.getName();
738 if (HEAD.equals(name)) {
739 return false;
740 }
741 if (name.startsWith(getSystem().getTxnNamespace())) {
742 return false;
743 }
744
745 return true;
746 }
747
748 @NonNull
749 static ObjectId getId(@Nullable Ref ref) {
750 if (ref != null) {
751 ObjectId id = ref.getObjectId();
752 if (id != null) {
753 return id;
754 }
755 }
756 return ObjectId.zeroId();
757 }
758 }