View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.ketch;
12  
13  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
14  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
15  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
16  import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
17  import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
18  
19  import java.io.IOException;
20  import java.text.MessageFormat;
21  import java.util.ArrayList;
22  import java.util.Arrays;
23  import java.util.Collection;
24  import java.util.List;
25  import java.util.concurrent.locks.Lock;
26  import java.util.concurrent.locks.ReentrantLock;
27  
28  import org.eclipse.jgit.internal.storage.reftree.RefTree;
29  import org.eclipse.jgit.lib.ObjectId;
30  import org.eclipse.jgit.lib.Repository;
31  import org.eclipse.jgit.revwalk.RevCommit;
32  import org.eclipse.jgit.revwalk.RevWalk;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * A leader managing consensus across remote followers.
38   * <p>
39   * A leader instance starts up in
40   * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
41   * to begin a new term by sending an
42   * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
43   * term starts if a majority of replicas have accepted this leader instance for
44   * the term.
45   * <p>
46   * Once elected by a majority the instance enters
47   * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
48   * proposals offered to {@link #queueProposal(Proposal)}. This continues until
49   * the leader is timed out for inactivity, or is deposed by a competing leader
50   * gaining its own majority.
51   * <p>
52   * Once timed out or deposed this {@code KetchLeader} instance should be
53   * discarded, and a new instance takes over.
54   * <p>
55   * Each leader instance coordinates a group of
56   * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
57   * owned by the leader instance and must be discarded when the leader is
58   * discarded.
59   * <p>
60   * In Ketch all push requests are issued through the leader. The steps are as
61   * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
62   * example):
63   * <ul>
64   * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
65   * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
66   * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
67   * <li>Wait for consensus with
68   * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
69   * <li>To examine the status of the push, check
70   * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
71   * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
72   * </ul>
73   * <p>
74   * The leader gains consensus by first pushing the needed objects and a
75   * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
76   * desired target repository state to the {@code refs/txn/accepted} branch on
77   * each of the replicas. Once a majority has succeeded, the leader commits the
78   * state by either pushing the {@code refs/txn/accepted} value to
79   * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
80   * to {@code refs/heads/master}, etc. for stock Git replicas.
81   * <p>
82   * Internally, the actual transport to replicas is performed on background
83   * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
84   * executor service. For performance, the
85   * {@link org.eclipse.jgit.internal.ketch.KetchLeader},
86   * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
87   * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
88   * and may invoke each other's methods on different threads. This access is
89   * protected by the leader's {@link #lock} object. Care must be taken to prevent
90   * concurrent access by correctly obtaining the leader's lock.
91   */
92  public abstract class KetchLeader {
93  	private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
94  
95  	/** Current state of the leader instance. */
96  	public enum State {
97  		/** Newly created instance trying to elect itself leader. */
98  		CANDIDATE,
99  
100 		/** Leader instance elected by a majority. */
101 		LEADER,
102 
103 		/** Instance has been deposed by another with a more recent term. */
104 		DEPOSED,
105 
106 		/** Leader has been gracefully shutdown, e.g. due to inactivity. */
107 		SHUTDOWN;
108 	}
109 
110 	private final KetchSystem system;
111 
112 	/** Leader's knowledge of replicas for this repository. */
113 	private KetchReplica[] voters;
114 	private KetchReplica[] followers;
115 	private LocalReplica self;
116 
117 	/**
118 	 * Lock protecting all data within this leader instance.
119 	 * <p>
120 	 * This lock extends into the {@link KetchReplica} instances used by the
121 	 * leader. They share the same lock instance to simplify concurrency.
122 	 */
123 	final Lock lock;
124 
125 	private State state = CANDIDATE;
126 
127 	/** Term of this leader, once elected. */
128 	private long term;
129 
130 	/**
131 	 * Pending proposals accepted into the queue in FIFO order.
132 	 * <p>
133 	 * These proposals were preflighted and do not contain any conflicts with
134 	 * each other and their expectations matched the leader's local view of the
135 	 * agreed upon {@code refs/txn/accepted} tree.
136 	 */
137 	private final List<Proposal> queued;
138 
139 	/**
140 	 * State of the repository's RefTree after applying all entries in
141 	 * {@link #queued}. New proposals must be consistent with this tree to be
142 	 * appended to the end of {@link #queued}.
143 	 * <p>
144 	 * Must be deep-copied with {@link RefTree#copy()} if
145 	 * {@link #roundHoldsReferenceToRefTree} is {@code true}.
146 	 */
147 	private RefTree refTree;
148 
149 	/**
150 	 * If {@code true} {@link #refTree} must be duplicated before queuing the
151 	 * next proposal. The {@link #refTree} was passed into the constructor of a
152 	 * {@link ProposalRound}, and that external reference to the {@link RefTree}
153 	 * object is held by the proposal until it materializes the tree object in
154 	 * the object store. This field is set {@code true} when the proposal begins
155 	 * execution and set {@code false} once tree objects are persisted in the
156 	 * local repository's object store or {@link #refTree} is replaced with a
157 	 * copy to isolate it from any running rounds.
158 	 * <p>
159 	 * If proposals arrive less frequently than the {@code RefTree} is written
160 	 * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
161 	 * avoids duplicating {@link #refTree}, reducing both time and memory used.
162 	 * However if proposals arrive more frequently {@link #refTree} must be
163 	 * duplicated to prevent newly queued proposals from corrupting the
164 	 * {@link #runningRound}.
165 	 */
166 	volatile boolean roundHoldsReferenceToRefTree;
167 
168 	/** End of the leader's log. */
169 	private LogIndex headIndex;
170 
171 	/** Leader knows this (and all prior) states are committed. */
172 	private LogIndex committedIndex;
173 
174 	/**
175 	 * Is the leader idle with no work pending? If {@code true} there is no work
176 	 * for the leader (normal state). This field is {@code false} when the
177 	 * leader thread is scheduled for execution, or while {@link #runningRound}
178 	 * defines a round in progress.
179 	 */
180 	private boolean idle;
181 
182 	/** Current round the leader is preparing and waiting for a vote on. */
183 	private Round runningRound;
184 
185 	/**
186 	 * Construct a leader for a Ketch instance.
187 	 *
188 	 * @param system
189 	 *            Ketch system configuration the leader must adhere to.
190 	 */
191 	protected KetchLeader(KetchSystem system) {
192 		this.system = system;
193 		this.lock = new ReentrantLock(true /* fair */);
194 		this.queued = new ArrayList<>(4);
195 		this.idle = true;
196 	}
197 
198 	/** @return system configuration. */
199 	KetchSystem getSystem() {
200 		return system;
201 	}
202 
203 	/**
204 	 * Configure the replicas used by this Ketch instance.
205 	 * <p>
206 	 * Replicas should be configured once at creation before any proposals are
207 	 * executed. Once elections happen, <b>reconfiguration is a complicated
208 	 * concept that is not currently supported</b>.
209 	 *
210 	 * @param replicas
211 	 *            members participating with the same repository.
212 	 */
213 	public void setReplicas(Collection<KetchReplica> replicas) {
214 		List<KetchReplica> v = new ArrayList<>(5);
215 		List<KetchReplica> f = new ArrayList<>(5);
216 		for (KetchReplica r : replicas) {
217 			switch (r.getParticipation()) {
218 			case FULL:
219 				v.add(r);
220 				break;
221 
222 			case FOLLOWER_ONLY:
223 				f.add(r);
224 				break;
225 			}
226 		}
227 
228 		Collection<Integer> validVoters = validVoterCounts();
229 		if (!validVoters.contains(Integer.valueOf(v.size()))) {
230 			throw new IllegalArgumentException(MessageFormat.format(
231 					KetchText.get().unsupportedVoterCount,
232 					Integer.valueOf(v.size()),
233 					validVoters));
234 		}
235 
236 		LocalReplica me = findLocal(v);
237 		if (me == null) {
238 			throw new IllegalArgumentException(
239 					KetchText.get().localReplicaRequired);
240 		}
241 
242 		lock.lock();
243 		try {
244 			voters = v.toArray(new KetchReplica[0]);
245 			followers = f.toArray(new KetchReplica[0]);
246 			self = me;
247 		} finally {
248 			lock.unlock();
249 		}
250 	}
251 
252 	private static Collection<Integer> validVoterCounts() {
253 		@SuppressWarnings("boxing")
254 		Integer[] valid = {
255 				// An odd number of voting replicas is required.
256 				1, 3, 5, 7, 9 };
257 		return Arrays.asList(valid);
258 	}
259 
260 	private static LocalReplica findLocal(Collection<KetchReplica> voters) {
261 		for (KetchReplica r : voters) {
262 			if (r instanceof LocalReplica) {
263 				return (LocalReplica) r;
264 			}
265 		}
266 		return null;
267 	}
268 
269 	/**
270 	 * Get an instance of the repository for use by a leader thread.
271 	 * <p>
272 	 * The caller will close the repository.
273 	 *
274 	 * @return opened repository for use by the leader thread.
275 	 * @throws java.io.IOException
276 	 *             cannot reopen the repository for the leader.
277 	 */
278 	protected abstract Repository openRepository() throws IOException;
279 
280 	/**
281 	 * Queue a reference update proposal for consensus.
282 	 * <p>
283 	 * This method does not wait for consensus to be reached. The proposal is
284 	 * checked to look for risks of conflicts, and then submitted into the queue
285 	 * for distribution as soon as possible.
286 	 * <p>
287 	 * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
288 	 * to see if the proposal is done.
289 	 *
290 	 * @param proposal
291 	 *            the proposed reference updates to queue for consideration.
292 	 *            Once execution is complete the individual reference result
293 	 *            fields will be populated with the outcome.
294 	 * @throws java.lang.InterruptedException
295 	 *             current thread was interrupted. The proposal may have been
296 	 *             aborted if it was not yet queued for execution.
297 	 * @throws java.io.IOException
298 	 *             unrecoverable error preventing proposals from being attempted
299 	 *             by this leader.
300 	 */
301 	public void queueProposal(Proposal proposal)
302 			throws InterruptedException, IOException {
303 		try {
304 			lock.lockInterruptibly();
305 		} catch (InterruptedException e) {
306 			proposal.abort();
307 			throw e;
308 		}
309 		try {
310 			if (refTree == null) {
311 				initialize();
312 				for (Proposal p : queued) {
313 					refTree.apply(p.getCommands());
314 				}
315 			} else if (roundHoldsReferenceToRefTree) {
316 				refTree = refTree.copy();
317 				roundHoldsReferenceToRefTree = false;
318 			}
319 
320 			if (!refTree.apply(proposal.getCommands())) {
321 				// A conflict exists so abort the proposal.
322 				proposal.abort();
323 				return;
324 			}
325 
326 			queued.add(proposal);
327 			proposal.notifyState(QUEUED);
328 
329 			if (idle) {
330 				scheduleLeader();
331 			}
332 		} finally {
333 			lock.unlock();
334 		}
335 	}
336 
337 	private void initialize() throws IOException {
338 		try (Repository git = openRepository(); RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
339 			self.initialize(git);
340 
341 			ObjectId accepted = self.getTxnAccepted();
342 			if (!ObjectId.zeroId().equals(accepted)) {
343 				RevCommit c = rw.parseCommit(accepted);
344 				headIndex = LogIndex.unknown(accepted);
345 				refTree = RefTree.read(rw.getObjectReader(), c.getTree());
346 			} else {
347 				headIndex = LogIndex.unknown(ObjectId.zeroId());
348 				refTree = RefTree.newEmptyTree();
349 			}
350 		}
351 	}
352 
353 	private void scheduleLeader() {
354 		idle = false;
355 		system.getExecutor().execute(this::runLeader);
356 	}
357 
358 	private void runLeader() {
359 		Round round;
360 		lock.lock();
361 		try {
362 			switch (state) {
363 			case CANDIDATE:
364 				round = new ElectionRound(this, headIndex);
365 				break;
366 
367 			case LEADER:
368 				round = newProposalRound();
369 				break;
370 
371 			case DEPOSED:
372 			case SHUTDOWN:
373 			default:
374 				log.warn("Leader cannot run {}", state); //$NON-NLS-1$
375 				// TODO(sop): Redirect proposals.
376 				return;
377 			}
378 		} finally {
379 			lock.unlock();
380 		}
381 
382 		try {
383 			round.start();
384 		} catch (IOException e) {
385 			// TODO(sop) Depose leader if it cannot use its repository.
386 			log.error(KetchText.get().leaderFailedToStore, e);
387 			lock.lock();
388 			try {
389 				nextRound();
390 			} finally {
391 				lock.unlock();
392 			}
393 		}
394 	}
395 
396 	private ProposalRound newProposalRound() {
397 		List<Proposal> todo = new ArrayList<>(queued);
398 		queued.clear();
399 		roundHoldsReferenceToRefTree = true;
400 		return new ProposalRound(this, headIndex, todo, refTree);
401 	}
402 
403 	/** @return term of this leader's reign. */
404 	long getTerm() {
405 		return term;
406 	}
407 
408 	/** @return end of the leader's log. */
409 	LogIndex getHead() {
410 		return headIndex;
411 	}
412 
413 	/**
414 	 * @return state leader knows it has committed across a quorum of replicas.
415 	 */
416 	LogIndex getCommitted() {
417 		return committedIndex;
418 	}
419 
420 	boolean isIdle() {
421 		return idle;
422 	}
423 
424 	void runAsync(Round round) {
425 		lock.lock();
426 		try {
427 			// End of the log is this round. Once transport begins it is
428 			// reasonable to assume at least one replica will eventually get
429 			// this, and there is reasonable probability it commits.
430 			headIndex = round.acceptedNewIndex;
431 			runningRound = round;
432 
433 			for (KetchReplica replica : voters) {
434 				replica.pushTxnAcceptedAsync(round);
435 			}
436 			for (KetchReplica replica : followers) {
437 				replica.pushTxnAcceptedAsync(round);
438 			}
439 		} finally {
440 			lock.unlock();
441 		}
442 	}
443 
444 	/**
445 	 * Asynchronous signal from a replica after completion.
446 	 * <p>
447 	 * Must be called while {@link #lock} is held by the replica.
448 	 *
449 	 * @param replica
450 	 *            replica posting a completion event.
451 	 */
452 	void onReplicaUpdate(KetchReplica replica) {
453 		if (log.isDebugEnabled()) {
454 			log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
455 					replica.describeForLog(), snapshot());
456 		}
457 
458 		if (replica.getParticipation() == FOLLOWER_ONLY) {
459 			// Followers cannot vote, so votes haven't changed.
460 			return;
461 		} else if (runningRound == null) {
462 			// No round running, no need to tally votes.
463 			return;
464 		}
465 
466 		assert headIndex.equals(runningRound.acceptedNewIndex);
467 		int matching = 0;
468 		for (KetchReplica r : voters) {
469 			if (r.hasAccepted(headIndex)) {
470 				matching++;
471 			}
472 		}
473 
474 		int quorum = voters.length / 2 + 1;
475 		boolean success = matching >= quorum;
476 		if (!success) {
477 			return;
478 		}
479 
480 		switch (state) {
481 		case CANDIDATE:
482 			term = ((ElectionRound) runningRound).getTerm();
483 			state = LEADER;
484 			if (log.isDebugEnabled()) {
485 				log.debug("Won election, running term " + term); //$NON-NLS-1$
486 			}
487 
488 			//$FALL-THROUGH$
489 		case LEADER:
490 			committedIndex = headIndex;
491 			if (log.isDebugEnabled()) {
492 				log.debug("Committed {} in term {}", //$NON-NLS-1$
493 						committedIndex.describeForLog(),
494 						Long.valueOf(term));
495 			}
496 			nextRound();
497 			commitAsync(replica);
498 			notifySuccess(runningRound);
499 			if (log.isDebugEnabled()) {
500 				log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
501 			}
502 			break;
503 
504 		default:
505 			log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
506 			break;
507 		}
508 	}
509 
510 	private void notifySuccess(Round round) {
511 		// Drop the leader lock while notifying Proposal listeners.
512 		lock.unlock();
513 		try {
514 			round.success();
515 		} finally {
516 			lock.lock();
517 		}
518 	}
519 
520 	private void commitAsync(KetchReplica caller) {
521 		for (KetchReplica r : voters) {
522 			if (r == caller) {
523 				continue;
524 			}
525 			if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
526 				r.pushCommitAsync(committedIndex);
527 			}
528 		}
529 		for (KetchReplica r : followers) {
530 			if (r == caller) {
531 				continue;
532 			}
533 			if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
534 				r.pushCommitAsync(committedIndex);
535 			}
536 		}
537 	}
538 
539 	/** Schedule the next round; invoked while {@link #lock} is held. */
540 	void nextRound() {
541 		runningRound = null;
542 
543 		if (queued.isEmpty()) {
544 			idle = true;
545 		} else {
546 			// Caller holds lock. Reschedule leader on a new thread so
547 			// the call stack can unwind and lock is not held unexpectedly
548 			// during prepare for the next round.
549 			scheduleLeader();
550 		}
551 	}
552 
553 	/**
554 	 * Snapshot this leader
555 	 *
556 	 * @return snapshot of this leader
557 	 */
558 	public LeaderSnapshot snapshot() {
559 		lock.lock();
560 		try {
561 			LeaderSnapshot s = new LeaderSnapshot();
562 			s.state = state;
563 			s.term = term;
564 			s.headIndex = headIndex;
565 			s.committedIndex = committedIndex;
566 			s.idle = isIdle();
567 			for (KetchReplica r : voters) {
568 				s.replicas.add(r.snapshot());
569 			}
570 			for (KetchReplica r : followers) {
571 				s.replicas.add(r.snapshot());
572 			}
573 			return s;
574 		} finally {
575 			lock.unlock();
576 		}
577 	}
578 
579 	/**
580 	 * Gracefully shutdown this leader and cancel outstanding operations.
581 	 */
582 	public void shutdown() {
583 		lock.lock();
584 		try {
585 			if (state != SHUTDOWN) {
586 				state = SHUTDOWN;
587 				for (KetchReplica r : voters) {
588 					r.shutdown();
589 				}
590 				for (KetchReplica r : followers) {
591 					r.shutdown();
592 				}
593 			}
594 		} finally {
595 			lock.unlock();
596 		}
597 	}
598 
599 	/** {@inheritDoc} */
600 	@Override
601 	public String toString() {
602 		return snapshot().toString();
603 	}
604 }