View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.CANDIDATE;
47  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.LEADER;
48  import static org.eclipse.jgit.internal.ketch.KetchLeader.State.SHUTDOWN;
49  import static org.eclipse.jgit.internal.ketch.KetchReplica.Participation.FOLLOWER_ONLY;
50  import static org.eclipse.jgit.internal.ketch.Proposal.State.QUEUED;
51  
52  import java.io.IOException;
53  import java.text.MessageFormat;
54  import java.util.ArrayList;
55  import java.util.Arrays;
56  import java.util.Collection;
57  import java.util.List;
58  import java.util.concurrent.locks.Lock;
59  import java.util.concurrent.locks.ReentrantLock;
60  
61  import org.eclipse.jgit.internal.storage.reftree.RefTree;
62  import org.eclipse.jgit.lib.ObjectId;
63  import org.eclipse.jgit.lib.Repository;
64  import org.eclipse.jgit.revwalk.RevCommit;
65  import org.eclipse.jgit.revwalk.RevWalk;
66  import org.slf4j.Logger;
67  import org.slf4j.LoggerFactory;
68  
69  /**
70   * A leader managing consensus across remote followers.
71   * <p>
72   * A leader instance starts up in
73   * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#CANDIDATE} and tries
74   * to begin a new term by sending an
75   * {@link org.eclipse.jgit.internal.ketch.ElectionRound} to all replicas. Its
76   * term starts if a majority of replicas have accepted this leader instance for
77   * the term.
78   * <p>
79   * Once elected by a majority the instance enters
80   * {@link org.eclipse.jgit.internal.ketch.KetchLeader.State#LEADER} and runs
81   * proposals offered to {@link #queueProposal(Proposal)}. This continues until
82   * the leader is timed out for inactivity, or is deposed by a competing leader
83   * gaining its own majority.
84   * <p>
85   * Once timed out or deposed this {@code KetchLeader} instance should be
86   * discarded, and a new instance takes over.
87   * <p>
88   * Each leader instance coordinates a group of
89   * {@link org.eclipse.jgit.internal.ketch.KetchReplica}s. Replica instances are
90   * owned by the leader instance and must be discarded when the leader is
91   * discarded.
92   * <p>
93   * In Ketch all push requests are issued through the leader. The steps are as
94   * follows (see {@link org.eclipse.jgit.internal.ketch.KetchPreReceive} for an
95   * example):
96   * <ul>
97   * <li>Create a {@link org.eclipse.jgit.internal.ketch.Proposal} with the
98   * {@link org.eclipse.jgit.transport.ReceiveCommand}s that represent the push.
99   * <li>Invoke {@link #queueProposal(Proposal)} on the leader instance.
100  * <li>Wait for consensus with
101  * {@link org.eclipse.jgit.internal.ketch.Proposal#await()}.
102  * <li>To examine the status of the push, check
103  * {@link org.eclipse.jgit.internal.ketch.Proposal#getCommands()}, looking at
104  * {@link org.eclipse.jgit.internal.storage.reftree.Command#getResult()}.
105  * </ul>
106  * <p>
107  * The leader gains consensus by first pushing the needed objects and a
108  * {@link org.eclipse.jgit.internal.storage.reftree.RefTree} representing the
109  * desired target repository state to the {@code refs/txn/accepted} branch on
110  * each of the replicas. Once a majority has succeeded, the leader commits the
111  * state by either pushing the {@code refs/txn/accepted} value to
112  * {@code refs/txn/committed} (for Ketch-aware replicas) or by pushing updates
113  * to {@code refs/heads/master}, etc. for stock Git replicas.
114  * <p>
115  * Internally, the actual transport to replicas is performed on background
116  * threads via the {@link org.eclipse.jgit.internal.ketch.KetchSystem}'s
117  * executor service. For performance, the
118  * {@link org.eclipse.jgit.internal.ketch.KetchLeader},
119  * {@link org.eclipse.jgit.internal.ketch.KetchReplica} and
120  * {@link org.eclipse.jgit.internal.ketch.Proposal} objects share some state,
121  * and may invoke each other's methods on different threads. This access is
122  * protected by the leader's {@link #lock} object. Care must be taken to prevent
123  * concurrent access by correctly obtaining the leader's lock.
124  */
125 public abstract class KetchLeader {
126 	private static final Logger log = LoggerFactory.getLogger(KetchLeader.class);
127 
128 	/** Current state of the leader instance. */
129 	public static enum State {
130 		/** Newly created instance trying to elect itself leader. */
131 		CANDIDATE,
132 
133 		/** Leader instance elected by a majority. */
134 		LEADER,
135 
136 		/** Instance has been deposed by another with a more recent term. */
137 		DEPOSED,
138 
139 		/** Leader has been gracefully shutdown, e.g. due to inactivity. */
140 		SHUTDOWN;
141 	}
142 
143 	private final KetchSystem system;
144 
145 	/** Leader's knowledge of replicas for this repository. */
146 	private KetchReplica[] voters;
147 	private KetchReplica[] followers;
148 	private LocalReplica self;
149 
150 	/**
151 	 * Lock protecting all data within this leader instance.
152 	 * <p>
153 	 * This lock extends into the {@link KetchReplica} instances used by the
154 	 * leader. They share the same lock instance to simplify concurrency.
155 	 */
156 	final Lock lock;
157 
158 	private State state = CANDIDATE;
159 
160 	/** Term of this leader, once elected. */
161 	private long term;
162 
163 	/**
164 	 * Pending proposals accepted into the queue in FIFO order.
165 	 * <p>
166 	 * These proposals were preflighted and do not contain any conflicts with
167 	 * each other and their expectations matched the leader's local view of the
168 	 * agreed upon {@code refs/txn/accepted} tree.
169 	 */
170 	private final List<Proposal> queued;
171 
172 	/**
173 	 * State of the repository's RefTree after applying all entries in
174 	 * {@link #queued}. New proposals must be consistent with this tree to be
175 	 * appended to the end of {@link #queued}.
176 	 * <p>
177 	 * Must be deep-copied with {@link RefTree#copy()} if
178 	 * {@link #roundHoldsReferenceToRefTree} is {@code true}.
179 	 */
180 	private RefTree refTree;
181 
182 	/**
183 	 * If {@code true} {@link #refTree} must be duplicated before queuing the
184 	 * next proposal. The {@link #refTree} was passed into the constructor of a
185 	 * {@link ProposalRound}, and that external reference to the {@link RefTree}
186 	 * object is held by the proposal until it materializes the tree object in
187 	 * the object store. This field is set {@code true} when the proposal begins
188 	 * execution and set {@code false} once tree objects are persisted in the
189 	 * local repository's object store or {@link #refTree} is replaced with a
190 	 * copy to isolate it from any running rounds.
191 	 * <p>
192 	 * If proposals arrive less frequently than the {@code RefTree} is written
193 	 * out to the repository the {@link #roundHoldsReferenceToRefTree} behavior
194 	 * avoids duplicating {@link #refTree}, reducing both time and memory used.
195 	 * However if proposals arrive more frequently {@link #refTree} must be
196 	 * duplicated to prevent newly queued proposals from corrupting the
197 	 * {@link #runningRound}.
198 	 */
199 	volatile boolean roundHoldsReferenceToRefTree;
200 
201 	/** End of the leader's log. */
202 	private LogIndex headIndex;
203 
204 	/** Leader knows this (and all prior) states are committed. */
205 	private LogIndex committedIndex;
206 
207 	/**
208 	 * Is the leader idle with no work pending? If {@code true} there is no work
209 	 * for the leader (normal state). This field is {@code false} when the
210 	 * leader thread is scheduled for execution, or while {@link #runningRound}
211 	 * defines a round in progress.
212 	 */
213 	private boolean idle;
214 
215 	/** Current round the leader is preparing and waiting for a vote on. */
216 	private Round runningRound;
217 
218 	/**
219 	 * Construct a leader for a Ketch instance.
220 	 *
221 	 * @param system
222 	 *            Ketch system configuration the leader must adhere to.
223 	 */
224 	protected KetchLeader(KetchSystem system) {
225 		this.system = system;
226 		this.lock = new ReentrantLock(true /* fair */);
227 		this.queued = new ArrayList<>(4);
228 		this.idle = true;
229 	}
230 
231 	/** @return system configuration. */
232 	KetchSystem getSystem() {
233 		return system;
234 	}
235 
236 	/**
237 	 * Configure the replicas used by this Ketch instance.
238 	 * <p>
239 	 * Replicas should be configured once at creation before any proposals are
240 	 * executed. Once elections happen, <b>reconfiguration is a complicated
241 	 * concept that is not currently supported</b>.
242 	 *
243 	 * @param replicas
244 	 *            members participating with the same repository.
245 	 */
246 	public void setReplicas(Collection<KetchReplica> replicas) {
247 		List<KetchReplica> v = new ArrayList<>(5);
248 		List<KetchReplica> f = new ArrayList<>(5);
249 		for (KetchReplica r : replicas) {
250 			switch (r.getParticipation()) {
251 			case FULL:
252 				v.add(r);
253 				break;
254 
255 			case FOLLOWER_ONLY:
256 				f.add(r);
257 				break;
258 			}
259 		}
260 
261 		Collection<Integer> validVoters = validVoterCounts();
262 		if (!validVoters.contains(Integer.valueOf(v.size()))) {
263 			throw new IllegalArgumentException(MessageFormat.format(
264 					KetchText.get().unsupportedVoterCount,
265 					Integer.valueOf(v.size()),
266 					validVoters));
267 		}
268 
269 		LocalReplica me = findLocal(v);
270 		if (me == null) {
271 			throw new IllegalArgumentException(
272 					KetchText.get().localReplicaRequired);
273 		}
274 
275 		lock.lock();
276 		try {
277 			voters = v.toArray(new KetchReplica[v.size()]);
278 			followers = f.toArray(new KetchReplica[f.size()]);
279 			self = me;
280 		} finally {
281 			lock.unlock();
282 		}
283 	}
284 
285 	private static Collection<Integer> validVoterCounts() {
286 		@SuppressWarnings("boxing")
287 		Integer[] valid = {
288 				// An odd number of voting replicas is required.
289 				1, 3, 5, 7, 9 };
290 		return Arrays.asList(valid);
291 	}
292 
293 	private static LocalReplica findLocal(Collection<KetchReplica> voters) {
294 		for (KetchReplica r : voters) {
295 			if (r instanceof LocalReplica) {
296 				return (LocalReplica) r;
297 			}
298 		}
299 		return null;
300 	}
301 
302 	/**
303 	 * Get an instance of the repository for use by a leader thread.
304 	 * <p>
305 	 * The caller will close the repository.
306 	 *
307 	 * @return opened repository for use by the leader thread.
308 	 * @throws java.io.IOException
309 	 *             cannot reopen the repository for the leader.
310 	 */
311 	protected abstract Repository openRepository() throws IOException;
312 
313 	/**
314 	 * Queue a reference update proposal for consensus.
315 	 * <p>
316 	 * This method does not wait for consensus to be reached. The proposal is
317 	 * checked to look for risks of conflicts, and then submitted into the queue
318 	 * for distribution as soon as possible.
319 	 * <p>
320 	 * Callers must use {@link org.eclipse.jgit.internal.ketch.Proposal#await()}
321 	 * to see if the proposal is done.
322 	 *
323 	 * @param proposal
324 	 *            the proposed reference updates to queue for consideration.
325 	 *            Once execution is complete the individual reference result
326 	 *            fields will be populated with the outcome.
327 	 * @throws java.lang.InterruptedException
328 	 *             current thread was interrupted. The proposal may have been
329 	 *             aborted if it was not yet queued for execution.
330 	 * @throws java.io.IOException
331 	 *             unrecoverable error preventing proposals from being attempted
332 	 *             by this leader.
333 	 */
334 	public void queueProposal(Proposal proposal)
335 			throws InterruptedException, IOException {
336 		try {
337 			lock.lockInterruptibly();
338 		} catch (InterruptedException e) {
339 			proposal.abort();
340 			throw e;
341 		}
342 		try {
343 			if (refTree == null) {
344 				initialize();
345 				for (Proposal p : queued) {
346 					refTree.apply(p.getCommands());
347 				}
348 			} else if (roundHoldsReferenceToRefTree) {
349 				refTree = refTree.copy();
350 				roundHoldsReferenceToRefTree = false;
351 			}
352 
353 			if (!refTree.apply(proposal.getCommands())) {
354 				// A conflict exists so abort the proposal.
355 				proposal.abort();
356 				return;
357 			}
358 
359 			queued.add(proposal);
360 			proposal.notifyState(QUEUED);
361 
362 			if (idle) {
363 				scheduleLeader();
364 			}
365 		} finally {
366 			lock.unlock();
367 		}
368 	}
369 
370 	private void initialize() throws IOException {
371 		try (Repository git = openRepository(); RevWalk rw = new RevWalk(git)) {
372 			self.initialize(git);
373 
374 			ObjectId accepted = self.getTxnAccepted();
375 			if (!ObjectId.zeroId().equals(accepted)) {
376 				RevCommit c = rw.parseCommit(accepted);
377 				headIndex = LogIndex.unknown(accepted);
378 				refTree = RefTree.read(rw.getObjectReader(), c.getTree());
379 			} else {
380 				headIndex = LogIndex.unknown(ObjectId.zeroId());
381 				refTree = RefTree.newEmptyTree();
382 			}
383 		}
384 	}
385 
386 	private void scheduleLeader() {
387 		idle = false;
388 		system.getExecutor().execute(new Runnable() {
389 			@Override
390 			public void run() {
391 				runLeader();
392 			}
393 		});
394 	}
395 
396 	private void runLeader() {
397 		Round round;
398 		lock.lock();
399 		try {
400 			switch (state) {
401 			case CANDIDATE:
402 				round = new ElectionRound(this, headIndex);
403 				break;
404 
405 			case LEADER:
406 				round = newProposalRound();
407 				break;
408 
409 			case DEPOSED:
410 			case SHUTDOWN:
411 			default:
412 				log.warn("Leader cannot run {}", state); //$NON-NLS-1$
413 				// TODO(sop): Redirect proposals.
414 				return;
415 			}
416 		} finally {
417 			lock.unlock();
418 		}
419 
420 		try {
421 			round.start();
422 		} catch (IOException e) {
423 			// TODO(sop) Depose leader if it cannot use its repository.
424 			log.error(KetchText.get().leaderFailedToStore, e);
425 			lock.lock();
426 			try {
427 				nextRound();
428 			} finally {
429 				lock.unlock();
430 			}
431 		}
432 	}
433 
434 	private ProposalRound newProposalRound() {
435 		List<Proposal> todo = new ArrayList<>(queued);
436 		queued.clear();
437 		roundHoldsReferenceToRefTree = true;
438 		return new ProposalRound(this, headIndex, todo, refTree);
439 	}
440 
441 	/** @return term of this leader's reign. */
442 	long getTerm() {
443 		return term;
444 	}
445 
446 	/** @return end of the leader's log. */
447 	LogIndex getHead() {
448 		return headIndex;
449 	}
450 
451 	/**
452 	 * @return state leader knows it has committed across a quorum of replicas.
453 	 */
454 	LogIndex getCommitted() {
455 		return committedIndex;
456 	}
457 
458 	boolean isIdle() {
459 		return idle;
460 	}
461 
462 	void runAsync(Round round) {
463 		lock.lock();
464 		try {
465 			// End of the log is this round. Once transport begins it is
466 			// reasonable to assume at least one replica will eventually get
467 			// this, and there is reasonable probability it commits.
468 			headIndex = round.acceptedNewIndex;
469 			runningRound = round;
470 
471 			for (KetchReplica replica : voters) {
472 				replica.pushTxnAcceptedAsync(round);
473 			}
474 			for (KetchReplica replica : followers) {
475 				replica.pushTxnAcceptedAsync(round);
476 			}
477 		} finally {
478 			lock.unlock();
479 		}
480 	}
481 
482 	/**
483 	 * Asynchronous signal from a replica after completion.
484 	 * <p>
485 	 * Must be called while {@link #lock} is held by the replica.
486 	 *
487 	 * @param replica
488 	 *            replica posting a completion event.
489 	 */
490 	void onReplicaUpdate(KetchReplica replica) {
491 		if (log.isDebugEnabled()) {
492 			log.debug("Replica {} finished:\n{}", //$NON-NLS-1$
493 					replica.describeForLog(), snapshot());
494 		}
495 
496 		if (replica.getParticipation() == FOLLOWER_ONLY) {
497 			// Followers cannot vote, so votes haven't changed.
498 			return;
499 		} else if (runningRound == null) {
500 			// No round running, no need to tally votes.
501 			return;
502 		}
503 
504 		assert headIndex.equals(runningRound.acceptedNewIndex);
505 		int matching = 0;
506 		for (KetchReplica r : voters) {
507 			if (r.hasAccepted(headIndex)) {
508 				matching++;
509 			}
510 		}
511 
512 		int quorum = voters.length / 2 + 1;
513 		boolean success = matching >= quorum;
514 		if (!success) {
515 			return;
516 		}
517 
518 		switch (state) {
519 		case CANDIDATE:
520 			term = ((ElectionRound) runningRound).getTerm();
521 			state = LEADER;
522 			if (log.isDebugEnabled()) {
523 				log.debug("Won election, running term " + term); //$NON-NLS-1$
524 			}
525 
526 			//$FALL-THROUGH$
527 		case LEADER:
528 			committedIndex = headIndex;
529 			if (log.isDebugEnabled()) {
530 				log.debug("Committed {} in term {}", //$NON-NLS-1$
531 						committedIndex.describeForLog(),
532 						Long.valueOf(term));
533 			}
534 			nextRound();
535 			commitAsync(replica);
536 			notifySuccess(runningRound);
537 			if (log.isDebugEnabled()) {
538 				log.debug("Leader state:\n{}", snapshot()); //$NON-NLS-1$
539 			}
540 			break;
541 
542 		default:
543 			log.debug("Leader ignoring replica while in {}", state); //$NON-NLS-1$
544 			break;
545 		}
546 	}
547 
548 	private void notifySuccess(Round round) {
549 		// Drop the leader lock while notifying Proposal listeners.
550 		lock.unlock();
551 		try {
552 			round.success();
553 		} finally {
554 			lock.lock();
555 		}
556 	}
557 
558 	private void commitAsync(KetchReplica caller) {
559 		for (KetchReplica r : voters) {
560 			if (r == caller) {
561 				continue;
562 			}
563 			if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
564 				r.pushCommitAsync(committedIndex);
565 			}
566 		}
567 		for (KetchReplica r : followers) {
568 			if (r == caller) {
569 				continue;
570 			}
571 			if (r.shouldPushUnbatchedCommit(committedIndex, isIdle())) {
572 				r.pushCommitAsync(committedIndex);
573 			}
574 		}
575 	}
576 
577 	/** Schedule the next round; invoked while {@link #lock} is held. */
578 	void nextRound() {
579 		runningRound = null;
580 
581 		if (queued.isEmpty()) {
582 			idle = true;
583 		} else {
584 			// Caller holds lock. Reschedule leader on a new thread so
585 			// the call stack can unwind and lock is not held unexpectedly
586 			// during prepare for the next round.
587 			scheduleLeader();
588 		}
589 	}
590 
591 	/**
592 	 * Snapshot this leader
593 	 *
594 	 * @return snapshot of this leader
595 	 */
596 	public LeaderSnapshot snapshot() {
597 		lock.lock();
598 		try {
599 			LeaderSnapshot s = new LeaderSnapshot();
600 			s.state = state;
601 			s.term = term;
602 			s.headIndex = headIndex;
603 			s.committedIndex = committedIndex;
604 			s.idle = isIdle();
605 			for (KetchReplica r : voters) {
606 				s.replicas.add(r.snapshot());
607 			}
608 			for (KetchReplica r : followers) {
609 				s.replicas.add(r.snapshot());
610 			}
611 			return s;
612 		} finally {
613 			lock.unlock();
614 		}
615 	}
616 
617 	/**
618 	 * Gracefully shutdown this leader and cancel outstanding operations.
619 	 */
620 	public void shutdown() {
621 		lock.lock();
622 		try {
623 			if (state != SHUTDOWN) {
624 				state = SHUTDOWN;
625 				for (KetchReplica r : voters) {
626 					r.shutdown();
627 				}
628 				for (KetchReplica r : followers) {
629 					r.shutdown();
630 				}
631 			}
632 		} finally {
633 			lock.unlock();
634 		}
635 	}
636 
637 	/** {@inheritDoc} */
638 	@Override
639 	public String toString() {
640 		return snapshot().toString();
641 	}
642 }