View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
47  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
48  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
49  import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
50  import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
51  
52  import java.io.IOException;
53  import java.text.MessageFormat;
54  import java.util.ArrayList;
55  import java.util.Arrays;
56  import java.util.Collection;
57  import java.util.List;
58  import java.util.concurrent.locks.Lock;
59  import java.util.concurrent.locks.ReentrantLock;
60  
61  import org.eclipse.jgit.internal.storage.reftree.RefTree;
62  import org.eclipse.jgit.lib.ObjectId;
63  import org.eclipse.jgit.lib.Repository;
64  import org.eclipse.jgit.revwalk.RevCommit;
65  import org.eclipse.jgit.revwalk.RevWalk;
66  import org.slf4j.Logger;
67  import org.slf4j.LoggerFactory;
68  
69  /**
70   * A leader managing consensus across remote followers.
71   * <p>
72   * A leader instance starts up in {@link State#CANDIDATE} and tries to begin a
73   * new term by sending an {@link ElectionRound} to all replicas. Its term starts
74   * if a majority of replicas have accepted this leader instance for the term.
75   * <p>
76   * Once elected by a majority the instance enters {@link State#LEADER} and runs
77   * proposals offered to {@link #queueProposal(Proposal)}. This continues until
78   * the leader is timed out for inactivity, or is deposed by a competing leader
79   * gaining its own majority.
80   * <p>
81   * Once timed out or deposed this {@code KetchLeader} instance should be
82   * discarded, and a new instance takes over.
83   * <p>
84   * Each leader instance coordinates a group of {@link KetchReplica}s. Replica
85   * instances are owned by the leader instance and must be discarded when the
86   * leader is discarded.
87   * <p>
88   * In Ketch all push requests are issued through the leader. The steps are as
89   * follows (see {@link KetchPreReceive} for an example):
90   * <ul>
91   * <li>Create a {@link Proposal} with the
92   * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
93   * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
94   * <li>Wait for consensus with {@link Proposal#await()}.
95   * <li>To examine the status of the push, check {@link Proposal#getCommands()},
96   * looking at
97   * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
98   * </ul>
99   * <p>
100  * The leader gains consensus by first pushing the needed objects and a
101  * {@link RefTree} representing the desired target repository state to the
102  * {@code refs/txn/accepted} branch on each of the replicas. Once a majority has
103  * succeeded, the leader commits the state by either pushing the
104  * {@code refs/txn/accepted} value to {@code refs/txn/committed} (for
105  * Ketch-aware replicas) or by pushing updates to {@code refs/heads/master},
106  * etc. for stock Git replicas.
107  * <p>
108  * Internally, the actual transport to replicas is performed on background
109  * threads via the {@link KetchSystem}'s executor service. For performance, the
110  * {@link KetchLeader}, {@link KetchReplica} and {@link Proposal} objects share
111  * some state, and may invoke each other's methods on different threads. This
112  * access is protected by the leader's {@link #lock} object. Care must be taken
113  * to prevent concurrent access by correctly obtaining the leader's lock.
114  */
115 public abstract class KetchLeader {
116 	private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
117 
118 	/** Current state of the leader instance. */
119 	public static enum State {
120 		/** Newly created instance trying to elect itself leader. */
121 		CANDIDATE,
122 
123 		/** Leader instance elected by a majority. */
124 		LEADER,
125 
126 		/** Instance has been deposed by another with a more recent term. */
127 		DEPOSED,
128 
129 		/** Leader has been gracefully shutdown, e.g. due to inactivity. */
130 		SHUTDOWN;
131 	}
132 
133 	private final KetchSystem system;
134 
135 	/** Leader's knowledge of replicas for this repository. */
136 	private KetchReplica[] voters;
137 	private KetchReplica[] followers;
138 	private LocalReplica self;
139 
140 	/**
141 	 * Lock protecting all data within this leader instance.
142 	 * <p>
143 	 * This lock extends into the {@link KetchReplica} instances used by the
144 	 * leader. They share the same lock instance to simplify concurrency.
145 	 */
146 	final Lock lock;
147 
148 	private State state = CANDIDATE;
149 
150 	/** Term of this leader, once elected. */
151 	private long term;
152 
153 	/**
154 	 * Pending proposals accepted into the queue in FIFO order.
155 	 * <p>
156 	 * These proposals were preflighted and do not contain any conflicts with
157 	 * each other and their expectations matched the leader's local view of the
158 	 * agreed upon {@code refs/txn/accepted} tree.
159 	 */
160 	private final List<Proposal> queued;
161 
162 	/**
163 	 * State of the repository's RefTree after applying all entries in
164 	 * {@link #queued}. New proposals must be consistent with this tree to be
165 	 * appended to the end of {@link #queued}.
166 	 * <p>
167 	 * Must be deep-copied with {@link RefTree#copy()} if
168 	 * {@link #roundHoldsReferenceToRefTree} is {@code true}.
169 	 */
170 	private RefTree refTree;
171 
172 	/**
173 	 * If {@code true} {@link #refTree} must be duplicated before queuing the
174 	 * next proposal. The {@link #refTree} was passed into the constructor of a
175 	 * {@link ProposalRound}, and that external reference to the {@link RefTree}
176 	 * object is held by the proposal until it materializes the tree object in
177 	 * the object store. This field is set {@code true} when the proposal begins
178 	 * execution and set {@code false} once tree objects are persisted in the
179 	 * local repository's object store or {@link #refTree} is replaced with a
180 	 * copy to isolate it from any running rounds.
181 	 * <p>
182 	 * If proposals arrive less frequently than the {@code RefTree} is written
183 	 * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
184 	 * avoids duplicating {@link #refTree}, reducing both time and memory used.
185 	 * However if proposals arrive more frequently {@link #refTree} must be
186 	 * duplicated to prevent newly queued proposals from corrupting the
187 	 * {@link #runningRound}.
188 	 */
189 	volatile boolean roundHoldsReferenceToRefTree;
190 
191 	/** End of the leader's log. */
192 	private LogIndex headIndex;
193 
194 	/** Leader knows this (and all prior) states are committed. */
195 	private LogIndex committedIndex;
196 
197 	/**
198 	 * Is the leader idle with no work pending? If {@code true} there is no work
199 	 * for the leader (normal state). This field is {@code false} when the
200 	 * leader thread is scheduled for execution, or while {@link #runningRound}
201 	 * defines a round in progress.
202 	 */
203 	private boolean idle;
204 
205 	/** Current round the leader is preparing and waiting for a vote on. */
206 	private Round runningRound;
207 
208 	/**
209 	 * Construct a leader for a Ketch instance.
210 	 *
211 	 * @param system
212 	 *            Ketch system configuration the leader must adhere to.
213 	 */
214 	protected KetchLeader(KetchSystem system) {
215 		this.system = system;
216 		this.lock = new ReentrantLock(true /* fair */);
217 		this.queued = new ArrayList<>(4);
218 		this.idle = true;
219 	}
220 
221 	/** @return system configuration. */
222 	KetchSystem getSystem() {
223 		return system;
224 	}
225 
226 	/**
227 	 * Configure the replicas used by this Ketch instance.
228 	 * <p>
229 	 * Replicas should be configured once at creation before any proposals are
230 	 * executed. Once elections happen, <b>reconfiguration is a complicated
231 	 * concept that is not currently supported</b>.
232 	 *
233 	 * @param replicas
234 	 *            members participating with the same repository.
235 	 */
236 	public void setReplicas(Collection<KetchReplica> replicas) {
237 		List<KetchReplica> v = new ArrayList<>(5);
238 		List<KetchReplica> f = new ArrayList<>(5);
239 		for (KetchReplica r : replicas) {
240 			switch (r.getParticipation()) {
241 			case FULL:
242 				v.add(r);
243 				break;
244 
245 			case FOLLOWER_ONLY:
246 				f.add(r);
247 				break;
248 			}
249 		}
250 
251 		Collection<Integer> validVoters = validVoterCounts();
252 		if (!validVoters.contains(Integer.valueOf(v.size()))) {
253 			throw new IllegalArgumentException(MessageFormat.format(
254 					KetchText.get().unsupportedVoterCount,
255 					Integer.valueOf(v.size()),
256 					validVoters));
257 		}
258 
259 		LocalReplica me = findLocal(v);
260 		if (me == null) {
261 			throw new IllegalArgumentException(
262 					KetchText.get().localReplicaRequired);
263 		}
264 
265 		lock.lock();
266 		try {
267 			voters = v.toArray(new KetchReplica[v.size()]);
268 			followers = f.toArray(new KetchReplica[f.size()]);
269 			self = me;
270 		} finally {
271 			lock.unlock();
272 		}
273 	}
274 
275 	private static Collection<Integer> validVoterCounts() {
276 		@SuppressWarnings("boxing")
277 		Integer[] valid = {
278 				// An odd number of voting replicas is required.
279 				1, 3, 5, 7, 9 };
280 		return Arrays.asList(valid);
281 	}
282 
283 	private static LocalReplica findLocal(Collection<KetchReplica> voters) {
284 		for (KetchReplica r : voters) {
285 			if (r instanceof LocalReplica) {
286 				return (LocalReplica) r;
287 			}
288 		}
289 		return null;
290 	}
291 
292 	/**
293 	 * Get an instance of the repository for use by a leader thread.
294 	 * <p>
295 	 * The caller will close the repository.
296 	 *
297 	 * @return opened repository for use by the leader thread.
298 	 * @throws IOException
299 	 *             cannot reopen the repository for the leader.
300 	 */
301 	protected abstract Repository openRepository() throws IOException;
302 
303 	/**
304 	 * Queue a reference update proposal for consensus.
305 	 * <p>
306 	 * This method does not wait for consensus to be reached. The proposal is
307 	 * checked to look for risks of conflicts, and then submitted into the queue
308 	 * for distribution as soon as possible.
309 	 * <p>
310 	 * Callers must use {@link Proposal#await()} to see if the proposal is done.
311 	 *
312 	 * @param proposal
313 	 *            the proposed reference updates to queue for consideration.
314 	 *            Once execution is complete the individual reference result
315 	 *            fields will be populated with the outcome.
316 	 * @throws InterruptedException
317 	 *             current thread was interrupted. The proposal may have been
318 	 *             aborted if it was not yet queued for execution.
319 	 * @throws IOException
320 	 *             unrecoverable error preventing proposals from being attempted
321 	 *             by this leader.
322 	 */
323 	public void queueProposal(Proposal proposal)
324 			throws InterruptedException, IOException {
325 		try {
326 			lock.lockInterruptibly();
327 		} catch (InterruptedException e) {
328 			proposal.abort();
329 			throw e;
330 		}
331 		try {
332 			if (refTree == null) {
333 				initialize();
334 				for (Proposal p : queued) {
335 					refTree.apply(p.getCommands());
336 				}
337 			} else if (roundHoldsReferenceToRefTree) {
338 				refTree = refTree.copy();
339 				roundHoldsReferenceToRefTree = false;
340 			}
341 
342 			if (!refTree.apply(proposal.getCommands())) {
343 				// A conflict exists so abort the proposal.
344 				proposal.abort();
345 				return;
346 			}
347 
348 			queued.add(proposal);
349 			proposal.notifyState(QUEUED);
350 
351 			if (idle) {
352 				scheduleLeader();
353 			}
354 		} finally {
355 			lock.unlock();
356 		}
357 	}
358 
359 	private void initialize() throws IOException {
360 		try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
361 			self.initialize(git);
362 
363 			ObjectId accepted = self.getTxnAccepted();
364 			if (!ObjectId.zeroId().equals(accepted)) {
365 				RevCommit c = rw.parseCommit(accepted);
366 				headIndex = LogIndex.unknown(accepted);
367 				refTree = RefTree.read(rw.getObjectReader(), c.getTree());
368 			} else {
369 				headIndex = LogIndex.unknown(ObjectId.zeroId());
370 				refTree = RefTree.newEmptyTree();
371 			}
372 		}
373 	}
374 
375 	private void scheduleLeader() {
376 		idle = false;
377 		system.getExecutor().execute(new Runnable() {
378 			@Override
379 			public void run() {
380 				runLeader();
381 			}
382 		});
383 	}
384 
385 	private void runLeader() {
386 		Round round;
387 		lock.lock();
388 		try {
389 			switch (state) {
390 			case CANDIDATE:
391 				round = new ElectionRound(this, headIndex);
392 				break;
393 
394 			case LEADER:
395 				round = newProposalRound();
396 				break;
397 
398 			case DEPOSED:
399 			case SHUTDOWN:
400 			default:
401 				log.warn("Leader cannot run {}", state); //$NON-NLS-1$
402 				// TODO(sop): Redirect proposals.
403 				return;
404 			}
405 		} finally {
406 			lock.unlock();
407 		}
408 
409 		try {
410 			round.start();
411 		} catch (IOException e) {
412 			// TODO(sop) Depose leader if it cannot use its repository.
413 			log.error(KetchText.get().leaderFailedToStore, e);
414 			lock.lock();
415 			try {
416 				nextRound();
417 			} finally {
418 				lock.unlock();
419 			}
420 		}
421 	}
422 
423 	private ProposalRound newProposalRound() {
424 		List<Proposal> todo = new ArrayList<>(queued);
425 		queued.clear();
426 		roundHoldsReferenceToRefTree = true;
427 		return new ProposalRound(this, headIndex, todo, refTree);
428 	}
429 
430 	/** @return term of this leader's reign. */
431 	long getTerm() {
432 		return term;
433 	}
434 
435 	/** @return end of the leader's log. */
436 	LogIndex getHead() {
437 		return headIndex;
438 	}
439 
440 	/**
441 	 * @return state leader knows it has committed across a quorum of replicas.
442 	 */
443 	LogIndex getCommitted() {
444 		return committedIndex;
445 	}
446 
447 	boolean isIdle() {
448 		return idle;
449 	}
450 
451 	void runAsync(Round round) {
452 		lock.lock();
453 		try {
454 			// End of the log is this round. Once transport begins it is
455 			// reasonable to assume at least one replica will eventually get
456 			// this, and there is reasonable probability it commits.
457 			headIndex = round.acceptedNewIndex;
458 			runningRound = round;
459 
460 			for (KetchReplica replica : voters) {
461 				replica.pushTxnAcceptedAsync(round);
462 			}
463 			for (KetchReplica replica : followers) {
464 				replica.pushTxnAcceptedAsync(round);
465 			}
466 		} finally {
467 			lock.unlock();
468 		}
469 	}
470 
471 	/**
472 	 * Asynchronous signal from a replica after completion.
473 	 * <p>
474 	 * Must be called while {@link #lock} is held by the replica.
475 	 *
476 	 * @param replica
477 	 *            replica posting a completion event.
478 	 */
479 	void onReplicaUpdate(KetchReplica replica) {
480 		if (log.isDebugEnabled()) {
481 			log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
482 					replica.describeForLog(), snapshot());
483 		}
484 
485 		if (replica.getParticipation() == FOLLOWER_ONLY) {
486 			// Followers cannot vote, so votes haven't changed.
487 			return;
488 		} else if (runningRound == null) {
489 			// No round running, no need to tally votes.
490 			return;
491 		}
492 
493 		assert headIndex.equals(runningRound.acceptedNewIndex);
494 		int matching = 0;
495 		for (KetchReplica r : voters) {
496 			if (r.hasAccepted(headIndex)) {
497 				matching++;
498 			}
499 		}
500 
501 		int quorum = voters.length / 2 + 1;
502 		boolean success = matching >= quorum;
503 		if (!success) {
504 			return;
505 		}
506 
507 		switch (state) {
508 		case CANDIDATE:
509 			term = ((ElectionRound) runningRound).getTerm();
510 			state = LEADER;
511 			if (log.isDebugEnabled()) {
512 				log.debug("Won election, running term " + term); //$NON-NLS-1$
513 			}
514 
515 			//$FALL-THROUGH$
516 		case LEADER:
517 			committedIndex = headIndex;
518 			if (log.isDebugEnabled()) {
519 				log.debug("Committed {} in term {}", //$NON-NLS-1$
520 						committedIndex.describeForLog(),
521 						Long.valueOf(term));
522 			}
523 			nextRound();
524 			commitAsync(replica);
525 			notifySuccess(runningRound);
526 			if (log.isDebugEnabled()) {
527 				log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
528 			}
529 			break;
530 
531 		default:
532 			log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
533 			break;
534 		}
535 	}
536 
537 	private void notifySuccess(Round round) {
538 		// Drop the leader lock while notifying Proposal listeners.
539 		lock.unlock();
540 		try {
541 			round.success();
542 		} finally {
543 			lock.lock();
544 		}
545 	}
546 
547 	private void commitAsync(KetchReplica caller) {
548 		for (KetchReplica r : voters) {
549 			if (r == caller) {
550 				continue;
551 			}
552 			if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
553 				r.pushCommitAsync(committedIndex);
554 			}
555 		}
556 		for (KetchReplica r : followers) {
557 			if (r == caller) {
558 				continue;
559 			}
560 			if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
561 				r.pushCommitAsync(committedIndex);
562 			}
563 		}
564 	}
565 
566 	/** Schedule the next round; invoked while {@link #lock} is held. */
567 	void nextRound() {
568 		runningRound = null;
569 
570 		if (queued.isEmpty()) {
571 			idle = true;
572 		} else {
573 			// Caller holds lock. Reschedule leader on a new thread so
574 			// the call stack can unwind and lock is not held unexpectedly
575 			// during prepare for the next round.
576 			scheduleLeader();
577 		}
578 	}
579 
580 	/** @return snapshot this leader. */
581 	public LeaderSnapshot snapshot() {
582 		lock.lock();
583 		try {
584 			LeaderSnapshot s = new LeaderSnapshot();
585 			s.state = state;
586 			s.term = term;
587 			s.headIndex = headIndex;
588 			s.committedIndex = committedIndex;
589 			s.idle = isIdle();
590 			for (KetchReplica r : voters) {
591 				s.replicas.add(r.snapshot());
592 			}
593 			for (KetchReplica r : followers) {
594 				s.replicas.add(r.snapshot());
595 			}
596 			return s;
597 		} finally {
598 			lock.unlock();
599 		}
600 	}
601 
602 	/** Gracefully shutdown this leader and cancel outstanding operations. */
603 	public void shutdown() {
604 		lock.lock();
605 		try {
606 			if (state != SHUTDOWN) {
607 				state = SHUTDOWN;
608 				for (KetchReplica r : voters) {
609 					r.shutdown();
610 				}
611 				for (KetchReplica r : followers) {
612 					r.shutdown();
613 				}
614 			}
615 		} finally {
616 			lock.unlock();
617 		}
618 	}
619 
620 	@Override
621 	public String toString() {
622 		return snapshot().toString();
623 	}
624 }