View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.ketch;
12  
13  import static java.util.concurrent.TimeUnit.MILLISECONDS;
14  import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.BATCHED;
15  import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitSpeed.FAST;
16  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.CURRENT;
17  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.LAGGING;
18  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.OFFLINE;
19  import static org.eclipse.jgit.internal.ketch.KetchReplica.State.UNKNOWN;
20  import static org.eclipse.jgit.lib.Constants.HEAD;
21  import static org.eclipse.jgit.lib.FileMode.TYPE_GITLINK;
22  import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
23  import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
24  import static org.eclipse.jgit.transport.ReceiveCommand.Type.CREATE;
25  
26  import java.io.IOException;
27  import java.lang.ref.WeakReference;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.HashMap;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.Future;
36  
37  import org.eclipse.jgit.annotations.NonNull;
38  import org.eclipse.jgit.annotations.Nullable;
39  import org.eclipse.jgit.internal.storage.reftree.RefTree;
40  import org.eclipse.jgit.lib.AnyObjectId;
41  import org.eclipse.jgit.lib.ObjectId;
42  import org.eclipse.jgit.lib.Ref;
43  import org.eclipse.jgit.lib.Repository;
44  import org.eclipse.jgit.revwalk.RevWalk;
45  import org.eclipse.jgit.transport.ReceiveCommand;
46  import org.eclipse.jgit.treewalk.TreeWalk;
47  import org.eclipse.jgit.util.FileUtils;
48  import org.eclipse.jgit.util.SystemReader;
49  import org.slf4j.Logger;
50  import org.slf4j.LoggerFactory;
51  
52  /**
53   * A Ketch replica, either {@link org.eclipse.jgit.internal.ketch.LocalReplica}
54   * or {@link org.eclipse.jgit.internal.ketch.RemoteGitReplica}.
55   * <p>
56   * Replicas can be either a stock Git replica, or a Ketch-aware replica.
57   * <p>
58   * A stock Git replica has no special knowledge of Ketch and simply stores
59   * objects and references. Ketch communicates with the stock Git replica using
60   * the Git push wire protocol. The
61   * {@link org.eclipse.jgit.internal.ketch.KetchLeader} commits an agreed upon
62   * state by pushing all references to the Git replica, for example
63   * {@code "refs/heads/master"} is pushed during commit. Stock Git replicas use
64   * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS} to
65   * record the final state.
66   * <p>
67   * Ketch-aware replicas understand the {@code RefTree} sent during the proposal
68   * and during commit are able to update their own reference space to match the
69   * state represented by the {@code RefTree}. Ketch-aware replicas typically use
70   * a {@link org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase} and
71   * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#TXN_COMMITTED}
72   * to record the final state.
73   * <p>
74   * KetchReplica instances are tightly coupled with a single
75   * {@link org.eclipse.jgit.internal.ketch.KetchLeader}. Some state may be
76   * accessed by the leader thread and uses the leader's own
77   * {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} to protect shared
78   * data.
79   */
80  public abstract class KetchReplica {
81  	static final Logger log = LoggerFactory.getLogger(KetchReplica.class);
82  	private static final byte[] PEEL = { ' ', '^' };
83  
84  	/** Participation of a replica in establishing consensus. */
85  	public enum Participation {
86  		/** Replica can vote. */
87  		FULL,
88  
89  		/** Replica does not vote, but tracks leader. */
90  		FOLLOWER_ONLY;
91  	}
92  
93  	/** How this replica wants to receive Ketch commit operations. */
94  	public enum CommitMethod {
95  		/** All references are pushed to the peer as standard Git. */
96  		ALL_REFS,
97  
98  		/** Only {@code refs/txn/committed} is written/updated. */
99  		TXN_COMMITTED;
100 	}
101 
102 	/** Delay before committing to a replica. */
103 	public enum CommitSpeed {
104 		/**
105 		 * Send the commit immediately, even if it could be batched with the
106 		 * next proposal.
107 		 */
108 		FAST,
109 
110 		/**
111 		 * If the next proposal is available, batch the commit with it,
112 		 * otherwise just send the commit. This generates less network use, but
113 		 * may provide slower consistency on the replica.
114 		 */
115 		BATCHED;
116 	}
117 
118 	/** Current state of a replica. */
119 	public enum State {
120 		/** Leader has not yet contacted the replica. */
121 		UNKNOWN,
122 
123 		/** Replica is behind the consensus. */
124 		LAGGING,
125 
126 		/** Replica matches the consensus. */
127 		CURRENT,
128 
129 		/** Replica has a different (or unknown) history. */
130 		DIVERGENT,
131 
132 		/** Replica's history contains the leader's history. */
133 		AHEAD,
134 
135 		/** Replica can not be contacted. */
136 		OFFLINE;
137 	}
138 
139 	private final KetchLeader leader;
140 	private final String replicaName;
141 	private final Participation participation;
142 	private final CommitMethod commitMethod;
143 	private final CommitSpeed commitSpeed;
144 	private final long minRetryMillis;
145 	private final long maxRetryMillis;
146 	private final Map<ObjectId, List<ReceiveCommand>> staged;
147 	private final Map<String, ReceiveCommand> running;
148 	private final Map<String, ReceiveCommand> waiting;
149 	private final List<ReplicaPushRequest> queued;
150 
151 	/**
152 	 * Value known for {@code "refs/txn/accepted"}.
153 	 * <p>
154 	 * Raft literature refers to this as {@code matchIndex}.
155 	 */
156 	private ObjectId txnAccepted;
157 
158 	/**
159 	 * Value known for {@code "refs/txn/committed"}.
160 	 * <p>
161 	 * Raft literature refers to this as {@code commitIndex}. In traditional
162 	 * Raft this is a state variable inside the follower implementation, but
163 	 * Ketch keeps it in the leader.
164 	 */
165 	private ObjectId txnCommitted;
166 
167 	/** What is happening with this replica. */
168 	private State state = UNKNOWN;
169 	private String error;
170 
171 	/** Scheduled retry due to communication failure. */
172 	private Future<?> retryFuture;
173 	private long lastRetryMillis;
174 	private long retryAtMillis;
175 
176 	/**
177 	 * Configure a replica representation.
178 	 *
179 	 * @param leader
180 	 *            instance this replica follows.
181 	 * @param name
182 	 *            unique-ish name identifying this replica for debugging.
183 	 * @param cfg
184 	 *            how Ketch should treat the replica.
185 	 */
186 	protected KetchReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
187 		this.leader = leader;
188 		this.replicaName = name;
189 		this.participation = cfg.getParticipation();
190 		this.commitMethod = cfg.getCommitMethod();
191 		this.commitSpeed = cfg.getCommitSpeed();
192 		this.minRetryMillis = cfg.getMinRetry(MILLISECONDS);
193 		this.maxRetryMillis = cfg.getMaxRetry(MILLISECONDS);
194 		this.staged = new HashMap<>();
195 		this.running = new HashMap<>();
196 		this.waiting = new HashMap<>();
197 		this.queued = new ArrayList<>(4);
198 	}
199 
200 	/**
201 	 * Get system configuration.
202 	 *
203 	 * @return system configuration.
204 	 */
205 	public KetchSystem getSystem() {
206 		return getLeader().getSystem();
207 	}
208 
209 	/**
210 	 * Get leader instance this replica follows.
211 	 *
212 	 * @return leader instance this replica follows.
213 	 */
214 	public KetchLeader getLeader() {
215 		return leader;
216 	}
217 
218 	/**
219 	 * Get unique-ish name for debugging.
220 	 *
221 	 * @return unique-ish name for debugging.
222 	 */
223 	public String getName() {
224 		return replicaName;
225 	}
226 
227 	/**
228 	 * Get description of this replica for error/debug logging purposes.
229 	 *
230 	 * @return description of this replica for error/debug logging purposes.
231 	 */
232 	protected String describeForLog() {
233 		return getName();
234 	}
235 
236 	/**
237 	 * Get how the replica participates in this Ketch system.
238 	 *
239 	 * @return how the replica participates in this Ketch system.
240 	 */
241 	public Participation getParticipation() {
242 		return participation;
243 	}
244 
245 	/**
246 	 * Get how Ketch will commit to the repository.
247 	 *
248 	 * @return how Ketch will commit to the repository.
249 	 */
250 	public CommitMethod getCommitMethod() {
251 		return commitMethod;
252 	}
253 
254 	/**
255 	 * Get when Ketch will commit to the repository.
256 	 *
257 	 * @return when Ketch will commit to the repository.
258 	 */
259 	public CommitSpeed getCommitSpeed() {
260 		return commitSpeed;
261 	}
262 
263 	/**
264 	 * Called by leader to perform graceful shutdown.
265 	 * <p>
266 	 * Default implementation cancels any scheduled retry. Subclasses may add
267 	 * additional logic before or after calling {@code super.shutdown()}.
268 	 * <p>
269 	 * Called with {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held
270 	 * by caller.
271 	 */
272 	protected void shutdown() {
273 		Future<?> f = retryFuture;
274 		if (f != null) {
275 			retryFuture = null;
276 			f.cancel(true);
277 		}
278 	}
279 
280 	ReplicaSnapshot snapshot() {
281 		ReplicaSnapshot s = new ReplicaSnapshot(this);
282 		s.accepted = txnAccepted;
283 		s.committed = txnCommitted;
284 		s.state = state;
285 		s.error = error;
286 		s.retryAtMillis = waitingForRetry() ? retryAtMillis : 0;
287 		return s;
288 	}
289 
290 	/**
291 	 * Update the leader's view of the replica after a poll.
292 	 * <p>
293 	 * Called with {@link KetchLeader#lock} held by caller.
294 	 *
295 	 * @param refs
296 	 *            map of refs from the replica.
297 	 */
298 	void initialize(Map<String, Ref> refs) {
299 		if (txnAccepted == null) {
300 			txnAccepted = getId(refs.get(getSystem().getTxnAccepted()));
301 		}
302 		if (txnCommitted == null) {
303 			txnCommitted = getId(refs.get(getSystem().getTxnCommitted()));
304 		}
305 	}
306 
307 	ObjectId getTxnAccepted() {
308 		return txnAccepted;
309 	}
310 
311 	boolean hasAccepted(LogIndex id) {
312 		return equals(txnAccepted, id);
313 	}
314 
315 	private static boolean equals(@Nullable ObjectId a, LogIndex b) {
316 		return a != null && b != null && AnyObjectId.isEqual(a, b);
317 	}
318 
319 	/**
320 	 * Schedule a proposal round with the replica.
321 	 * <p>
322 	 * Called with {@link KetchLeader#lock} held by caller.
323 	 *
324 	 * @param round
325 	 *            current round being run by the leader.
326 	 */
327 	void pushTxnAcceptedAsync(Round round) {
328 		List<ReceiveCommand> cmds = new ArrayList<>();
329 		if (commitSpeed == BATCHED) {
330 			LogIndex committedIndex = leader.getCommitted();
331 			if (equals(txnAccepted, committedIndex)
332 					&& !equals(txnCommitted, committedIndex)) {
333 				prepareTxnCommitted(cmds, committedIndex);
334 			}
335 		}
336 
337 		// TODO(sop) Lagging replicas should build accept on the fly.
338 		if (round.stageCommands != null) {
339 			for (ReceiveCommand cmd : round.stageCommands) {
340 				// TODO(sop): Do not send certain object graphs to replica.
341 				cmds.add(copy(cmd));
342 			}
343 		}
344 		cmds.add(new ReceiveCommand(
345 				round.acceptedOldIndex, round.acceptedNewIndex,
346 				getSystem().getTxnAccepted()));
347 		pushAsync(new ReplicaPushRequest(this, cmds));
348 	}
349 
350 	private static ReceiveCommand../../../../org/eclipse/jgit/transport/ReceiveCommand.html#ReceiveCommand">ReceiveCommand copy(ReceiveCommand c) {
351 		return new ReceiveCommand(c.getOldId(), c.getNewId(), c.getRefName());
352 	}
353 
354 	boolean shouldPushUnbatchedCommit(LogIndex committed, boolean leaderIdle) {
355 		return (leaderIdle || commitSpeed == FAST) && hasAccepted(committed);
356 	}
357 
358 	void pushCommitAsync(LogIndex committed) {
359 		List<ReceiveCommand> cmds = new ArrayList<>();
360 		prepareTxnCommitted(cmds, committed);
361 		pushAsync(new ReplicaPushRequest(this, cmds));
362 	}
363 
364 	private void prepareTxnCommitted(List<ReceiveCommand> cmds,
365 			ObjectId committed) {
366 		removeStaged(cmds, committed);
367 		cmds.add(new ReceiveCommand(
368 				txnCommitted, committed,
369 				getSystem().getTxnCommitted()));
370 	}
371 
372 	private void removeStaged(List<ReceiveCommand> cmds, ObjectId committed) {
373 		List<ReceiveCommand> a = staged.remove(committed);
374 		if (a != null) {
375 			delete(cmds, a);
376 		}
377 		if (staged.isEmpty() || !(committed instanceof LogIndex)) {
378 			return;
379 		}
380 
381 		LogIndex committedIndex = (LogIndex) committed;
382 		Iterator<Map.Entry<ObjectId, List<ReceiveCommand>>> itr = staged
383 				.entrySet().iterator();
384 		while (itr.hasNext()) {
385 			Map.Entry<ObjectId, List<ReceiveCommand>> e = itr.next();
386 			if (e.getKey() instanceof LogIndex) {
387 				LogIndex stagedIndex = (LogIndex) e.getKey();
388 				if (stagedIndex.isBefore(committedIndex)) {
389 					delete(cmds, e.getValue());
390 					itr.remove();
391 				}
392 			}
393 		}
394 	}
395 
396 	private static void delete(List<ReceiveCommand> cmds,
397 			List<ReceiveCommand> createCmds) {
398 		for (ReceiveCommand cmd : createCmds) {
399 			ObjectId id = cmd.getNewId();
400 			String name = cmd.getRefName();
401 			cmds.add(new ReceiveCommand(id, ObjectId.zeroId(), name));
402 		}
403 	}
404 
405 	/**
406 	 * Determine the next push for this replica (if any) and start it.
407 	 * <p>
408 	 * If the replica has successfully accepted the committed state of the
409 	 * leader, this method will push all references to the replica using the
410 	 * configured {@link CommitMethod}.
411 	 * <p>
412 	 * If the replica is {@link State#LAGGING} this method will begin catch up
413 	 * by sending a more recent {@code refs/txn/accepted}.
414 	 * <p>
415 	 * Must be invoked with {@link KetchLeader#lock} held by caller.
416 	 */
417 	private void runNextPushRequest() {
418 		LogIndex committed = leader.getCommitted();
419 		if (!equals(txnCommitted, committed)
420 				&& shouldPushUnbatchedCommit(committed, leader.isIdle())) {
421 			pushCommitAsync(committed);
422 		}
423 
424 		if (queued.isEmpty() || !running.isEmpty() || waitingForRetry()) {
425 			return;
426 		}
427 
428 		// Collapse all queued requests into a single request.
429 		Map<String, ReceiveCommand> cmdMap = new HashMap<>();
430 		for (ReplicaPushRequest req : queued) {
431 			for (ReceiveCommand cmd : req.getCommands()) {
432 				String name = cmd.getRefName();
433 				ReceiveCommand old = cmdMap.remove(name);
434 				if (old != null) {
435 					cmd = new ReceiveCommand(
436 							old.getOldId(), cmd.getNewId(),
437 							name);
438 				}
439 				cmdMap.put(name, cmd);
440 			}
441 		}
442 		queued.clear();
443 		waiting.clear();
444 
445 		List<ReceiveCommand> next = new ArrayList<>(cmdMap.values());
446 		for (ReceiveCommand cmd : next) {
447 			running.put(cmd.getRefName(), cmd);
448 		}
449 		startPush(new ReplicaPushRequest(this, next));
450 	}
451 
452 	private void pushAsync(ReplicaPushRequest req) {
453 		if (defer(req)) {
454 			// TODO(sop) Collapse during long retry outage.
455 			for (ReceiveCommand cmd : req.getCommands()) {
456 				waiting.put(cmd.getRefName(), cmd);
457 			}
458 			queued.add(req);
459 		} else {
460 			for (ReceiveCommand cmd : req.getCommands()) {
461 				running.put(cmd.getRefName(), cmd);
462 			}
463 			startPush(req);
464 		}
465 	}
466 
467 	private boolean defer(ReplicaPushRequest req) {
468 		if (waitingForRetry()) {
469 			// Prior communication failure; everything is deferred.
470 			return true;
471 		}
472 
473 		for (ReceiveCommand nextCmd : req.getCommands()) {
474 			ReceiveCommand priorCmd = waiting.get(nextCmd.getRefName());
475 			if (priorCmd == null) {
476 				priorCmd = running.get(nextCmd.getRefName());
477 			}
478 			if (priorCmd != null) {
479 				// Another request pending on same ref; that must go first.
480 				// Verify priorCmd.newId == nextCmd.oldId?
481 				return true;
482 			}
483 		}
484 		return false;
485 	}
486 
487 	private boolean waitingForRetry() {
488 		Future<?> f = retryFuture;
489 		return f != null && !f.isDone();
490 	}
491 
492 	private void retryLater(ReplicaPushRequest req) {
493 		Collection<ReceiveCommand> cmds = req.getCommands();
494 		for (ReceiveCommand cmd : cmds) {
495 			cmd.setResult(NOT_ATTEMPTED, null);
496 			if (!waiting.containsKey(cmd.getRefName())) {
497 				waiting.put(cmd.getRefName(), cmd);
498 			}
499 		}
500 		queued.add(0, new ReplicaPushRequest(this, cmds));
501 
502 		if (!waitingForRetry()) {
503 			long delay = FileUtils
504 				.delay(lastRetryMillis, minRetryMillis, maxRetryMillis);
505 			if (log.isDebugEnabled()) {
506 				log.debug("Retrying {} after {} ms", //$NON-NLS-1$
507 						describeForLog(), Long.valueOf(delay));
508 			}
509 			lastRetryMillis = delay;
510 			retryAtMillis = SystemReader.getInstance().getCurrentTime() + delay;
511 			retryFuture = getSystem().getExecutor()
512 					.schedule(new WeakRetryPush(this), delay, MILLISECONDS);
513 		}
514 	}
515 
516 	/** Weakly holds a retrying replica, allowing it to garbage collect. */
517 	static class WeakRetryPush extends WeakReference<KetchReplica>
518 			implements Callable<Void> {
519 		WeakRetryPush(KetchReplica r) {
520 			super(r);
521 		}
522 
523 		@Override
524 		public Void call() throws Exception {
525 			KetchReplica r = get();
526 			if (r != null) {
527 				r.doRetryPush();
528 			}
529 			return null;
530 		}
531 	}
532 
533 	private void doRetryPush() {
534 		leader.lock.lock();
535 		try {
536 			retryFuture = null;
537 			runNextPushRequest();
538 		} finally {
539 			leader.lock.unlock();
540 		}
541 	}
542 
543 	/**
544 	 * Begin executing a single push.
545 	 * <p>
546 	 * This method must move processing onto another thread. Called with
547 	 * {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock} held by caller.
548 	 *
549 	 * @param req
550 	 *            the request to send to the replica.
551 	 */
552 	protected abstract void startPush(ReplicaPushRequest req);
553 
554 	/**
555 	 * Callback from {@link ReplicaPushRequest} upon success or failure.
556 	 * <p>
557 	 * Acquires the {@link KetchLeader#lock} and updates the leader's internal
558 	 * knowledge about this replica to reflect what has been learned during a
559 	 * push to the replica. In some cases of divergence this method may take
560 	 * some time to determine how the replica has diverged; to reduce contention
561 	 * this is evaluated before acquiring the leader lock.
562 	 *
563 	 * @param repo
564 	 *            local repository instance used by the push thread.
565 	 * @param req
566 	 *            push request just attempted.
567 	 */
568 	void afterPush(@Nullable Repository repo, ReplicaPushRequest req) {
569 		ReceiveCommand acceptCmd = null;
570 		ReceiveCommand commitCmd = null;
571 		List<ReceiveCommand> stages = null;
572 
573 		for (ReceiveCommand cmd : req.getCommands()) {
574 			String name = cmd.getRefName();
575 			if (name.equals(getSystem().getTxnAccepted())) {
576 				acceptCmd = cmd;
577 			} else if (name.equals(getSystem().getTxnCommitted())) {
578 				commitCmd = cmd;
579 			} else if (cmd.getResult() == OK && cmd.getType() == CREATE
580 					&& name.startsWith(getSystem().getTxnStage())) {
581 				if (stages == null) {
582 					stages = new ArrayList<>();
583 				}
584 				stages.add(cmd);
585 			}
586 		}
587 
588 		State newState = null;
589 		ObjectId acceptId = readId(req, acceptCmd);
590 		if (repo != null && acceptCmd != null && acceptCmd.getResult() != OK
591 				&& req.getException() == null) {
592 			try (LagCheckl/ketch/LagCheck.html#LagCheck">LagCheck lag = new LagCheck(this, repo)) {
593 				newState = lag.check(acceptId, acceptCmd);
594 				acceptId = lag.getRemoteId();
595 			}
596 		}
597 
598 		leader.lock.lock();
599 		try {
600 			for (ReceiveCommand cmd : req.getCommands()) {
601 				running.remove(cmd.getRefName());
602 			}
603 
604 			Throwable err = req.getException();
605 			if (err != null) {
606 				state = OFFLINE;
607 				error = err.toString();
608 				retryLater(req);
609 				leader.onReplicaUpdate(this);
610 				return;
611 			}
612 
613 			lastRetryMillis = 0;
614 			error = null;
615 			updateView(req, acceptId, commitCmd);
616 
617 			if (acceptCmd != null && acceptCmd.getResult() == OK) {
618 				state = hasAccepted(leader.getHead()) ? CURRENT : LAGGING;
619 				if (stages != null) {
620 					staged.put(acceptCmd.getNewId(), stages);
621 				}
622 			} else if (newState != null) {
623 				state = newState;
624 			}
625 
626 			leader.onReplicaUpdate(this);
627 			runNextPushRequest();
628 		} finally {
629 			leader.lock.unlock();
630 		}
631 	}
632 
633 	private void updateView(ReplicaPushRequest req, @Nullable ObjectId acceptId,
634 			ReceiveCommand commitCmd) {
635 		if (acceptId != null) {
636 			txnAccepted = acceptId;
637 		}
638 
639 		ObjectId committed = readId(req, commitCmd);
640 		if (committed != null) {
641 			txnCommitted = committed;
642 		} else if (acceptId != null && txnCommitted == null) {
643 			// Initialize during first conversation.
644 			Map<String, Ref> adv = req.getRefs();
645 			if (adv != null) {
646 				Ref refs = adv.get(getSystem().getTxnCommitted());
647 				txnCommitted = getId(refs);
648 			}
649 		}
650 	}
651 
652 	@Nullable
653 	private static ObjectId readId(ReplicaPushRequest req,
654 			@Nullable ReceiveCommand cmd) {
655 		if (cmd == null) {
656 			// Ref was not in the command list, do not trust advertisement.
657 			return null;
658 
659 		} else if (cmd.getResult() == OK) {
660 			// Currently at newId.
661 			return cmd.getNewId();
662 		}
663 
664 		Map<String, Ref> refs = req.getRefs();
665 		return refs != null ? getId(refs.get(cmd.getRefName())) : null;
666 	}
667 
668 	/**
669 	 * Fetch objects from the remote using the calling thread.
670 	 * <p>
671 	 * Called without {@link org.eclipse.jgit.internal.ketch.KetchLeader#lock}.
672 	 *
673 	 * @param repo
674 	 *            local repository to fetch objects into.
675 	 * @param req
676 	 *            the request to fetch from a replica.
677 	 * @throws java.io.IOException
678 	 *             communication with the replica was not possible.
679 	 */
680 	protected abstract void blockingFetch(Repository repo,
681 			ReplicaFetchRequest req) throws IOException;
682 
683 	/**
684 	 * Build a list of commands to commit
685 	 * {@link org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod#ALL_REFS}.
686 	 *
687 	 * @param git
688 	 *            local leader repository to read committed state from.
689 	 * @param current
690 	 *            all known references in the replica's repository. Typically
691 	 *            this comes from a push advertisement.
692 	 * @param committed
693 	 *            state being pushed to {@code refs/txn/committed}.
694 	 * @return commands to update during commit.
695 	 * @throws java.io.IOException
696 	 *             cannot read the committed state.
697 	 */
698 	protected Collection<ReceiveCommand> prepareCommit(Repository git,
699 			Map<String, Ref> current, ObjectId committed) throws IOException {
700 		List<ReceiveCommand> delta = new ArrayList<>();
701 		Map<String, Ref> remote = new HashMap<>(current);
702 		try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git);
703 				TreeWalk tw = new TreeWalk(rw.getObjectReader())) {
704 			tw.setRecursive(true);
705 			tw.addTree(rw.parseCommit(committed).getTree());
706 			while (tw.next()) {
707 				if (tw.getRawMode(0) != TYPE_GITLINK
708 						|| tw.isPathSuffix(PEEL, 2)) {
709 					// Symbolic references cannot be pushed.
710 					// Caching peeled values is handled remotely.
711 					continue;
712 				}
713 
714 				// TODO(sop) Do not send certain ref names to replica.
715 				String name = RefTree.refName(tw.getPathString());
716 				Ref oldRef = remote.remove(name);
717 				ObjectId oldId = getId(oldRef);
718 				ObjectId newId = tw.getObjectId(0);
719 				if (!AnyObjectId.isEqual(oldId, newId)) {
720 					delta.add(new ReceiveCommand(oldId, newId, name));
721 				}
722 			}
723 		}
724 
725 		// Delete any extra references not in the committed state.
726 		for (Ref ref : remote.values()) {
727 			if (canDelete(ref)) {
728 				delta.add(new ReceiveCommand(
729 					ref.getObjectId(), ObjectId.zeroId(),
730 					ref.getName()));
731 			}
732 		}
733 		return delta;
734 	}
735 
736 	boolean canDelete(Ref ref) {
737 		String name = ref.getName();
738 		if (HEAD.equals(name)) {
739 			return false;
740 		}
741 		if (name.startsWith(getSystem().getTxnNamespace())) {
742 			return false;
743 		}
744 		// TODO(sop) Do not delete precious names from replica.
745 		return true;
746 	}
747 
748 	@NonNull
749 	static ObjectId getId(@Nullable Ref ref) {
750 		if (ref != null) {
751 			ObjectId id = ref.getObjectId();
752 			if (id != null) {
753 				return id;
754 			}
755 		}
756 		return ObjectId.zeroId();
757 	}
758 }