View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static org.eclipse.jgit.internal.ketch.Proposal.State.ABORTED;
47  import static org.eclipse.jgit.internal.ketch.Proposal.State.EXECUTED;
48  import static org.eclipse.jgit.internal.ketch.Proposal.State.NEW;
49  import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
50  import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
51  
52  import java.io.IOException;
53  import java.util.ArrayList;
54  import java.util.Collection;
55  import java.util.Collections;
56  import java.util.List;
57  import java.util.concurrent.CopyOnWriteArrayList;
58  import java.util.concurrent.TimeUnit;
59  import java.util.concurrent.atomic.AtomicReference;
60  
61  import org.eclipse.jgit.annotations.Nullable;
62  import org.eclipse.jgit.errors.MissingObjectException;
63  import org.eclipse.jgit.internal.storage.reftree.Command;
64  import org.eclipse.jgit.lib.ObjectId;
65  import org.eclipse.jgit.lib.PersonIdent;
66  import org.eclipse.jgit.lib.Ref;
67  import org.eclipse.jgit.revwalk.RevWalk;
68  import org.eclipse.jgit.transport.PushCertificate;
69  import org.eclipse.jgit.transport.ReceiveCommand;
70  import org.eclipse.jgit.util.time.ProposedTimestamp;
71  
72  /**
73   * A proposal to be applied in a Ketch system.
74   * <p>
75   * Pushing to a Ketch leader results in the leader making a proposal. The
76   * proposal includes the list of reference updates. The leader attempts to send
77   * the proposal to a quorum of replicas by pushing the proposal to a "staging"
78   * area under the {@code refs/txn/stage/} namespace. If the proposal succeeds
79   * then the changes are durable and the leader can commit the proposal.
80   * <p>
81   * Proposals are executed by
82   * {@link org.eclipse.jgit.internal.ketch.KetchLeader#queueProposal(Proposal)},
83   * which runs them asynchronously in the background. Proposals are thread-safe
84   * futures allowing callers to {@link #await()} for results or be notified by
85   * callback using {@link #addListener(Runnable)}.
86   */
87  public class Proposal {
88  	/** Current state of the proposal. */
89  	public enum State {
90  		/** Proposal has not yet been given to a {@link KetchLeader}. */
91  		NEW(false),
92  
93  		/**
94  		 * Proposal was validated and has entered the queue, but a round
95  		 * containing this proposal has not started yet.
96  		 */
97  		QUEUED(false),
98  
99  		/** Round containing the proposal has begun and is in progress. */
100 		RUNNING(false),
101 
102 		/**
103 		 * Proposal was executed through a round. Individual results from
104 		 * {@link Proposal#getCommands()}, {@link Command#getResult()} explain
105 		 * the success or failure outcome.
106 		 */
107 		EXECUTED(true),
108 
109 		/** Proposal was aborted and did not reach consensus. */
110 		ABORTED(true);
111 
112 		private final boolean done;
113 
114 		private State(boolean done) {
115 			this.done = done;
116 		}
117 
118 		/** @return true if this is a terminal state. */
119 		public boolean isDone() {
120 			return done;
121 		}
122 	}
123 
124 	private final List<Command> commands;
125 	private PersonIdent author;
126 	private String message;
127 	private PushCertificate pushCert;
128 
129 	private List<ProposedTimestamp> timestamps;
130 	private final List<Runnable> listeners = new CopyOnWriteArrayList<>();
131 	private final AtomicReference<State> state = new AtomicReference<>(NEW);
132 
133 	/**
134 	 * Create a proposal from a list of Ketch commands.
135 	 *
136 	 * @param cmds
137 	 *            prepared list of commands.
138 	 */
139 	public Proposal(List<Command> cmds) {
140 		commands = Collections.unmodifiableList(new ArrayList<>(cmds));
141 	}
142 
143 	/**
144 	 * Create a proposal from a collection of received commands.
145 	 *
146 	 * @param rw
147 	 *            walker to assist in preparing commands.
148 	 * @param cmds
149 	 *            list of pending commands.
150 	 * @throws org.eclipse.jgit.errors.MissingObjectException
151 	 *             newId of a command is not found locally.
152 	 * @throws java.io.IOException
153 	 *             local objects cannot be accessed.
154 	 */
155 	public Proposal(RevWalk rw, Collection<ReceiveCommand> cmds)
156 			throws MissingObjectException, IOException {
157 		commands = asCommandList(rw, cmds);
158 	}
159 
160 	private static List<Command> asCommandList(RevWalk rw,
161 			Collection<ReceiveCommand> cmds)
162 					throws MissingObjectException, IOException {
163 		List<Command> commands = new ArrayList<>(cmds.size());
164 		for (ReceiveCommand cmd : cmds) {
165 			commands.add(new Command(rw, cmd));
166 		}
167 		return Collections.unmodifiableList(commands);
168 	}
169 
170 	/**
171 	 * Get commands from this proposal.
172 	 *
173 	 * @return commands from this proposal.
174 	 */
175 	public Collection<Command> getCommands() {
176 		return commands;
177 	}
178 
179 	/**
180 	 * Get optional author of the proposal.
181 	 *
182 	 * @return optional author of the proposal.
183 	 */
184 	@Nullable
185 	public PersonIdent getAuthor() {
186 		return author;
187 	}
188 
189 	/**
190 	 * Set the author for the proposal.
191 	 *
192 	 * @param who
193 	 *            optional identity of the author of the proposal.
194 	 * @return {@code this}
195 	 */
196 	public Proposal setAuthor(@Nullable PersonIdent who) {
197 		author = who;
198 		return this;
199 	}
200 
201 	/**
202 	 * Get optional message for the commit log of the RefTree.
203 	 *
204 	 * @return optional message for the commit log of the RefTree.
205 	 */
206 	@Nullable
207 	public String getMessage() {
208 		return message;
209 	}
210 
211 	/**
212 	 * Set the message to appear in the commit log of the RefTree.
213 	 *
214 	 * @param msg
215 	 *            message text for the commit.
216 	 * @return {@code this}
217 	 */
218 	public Proposal setMessage(@Nullable String msg) {
219 		message = msg != null && !msg.isEmpty() ? msg : null;
220 		return this;
221 	}
222 
223 	/**
224 	 * Get optional certificate signing the references.
225 	 *
226 	 * @return optional certificate signing the references.
227 	 */
228 	@Nullable
229 	public PushCertificate getPushCertificate() {
230 		return pushCert;
231 	}
232 
233 	/**
234 	 * Set the push certificate signing the references.
235 	 *
236 	 * @param cert
237 	 *            certificate, may be null.
238 	 * @return {@code this}
239 	 */
240 	public Proposal setPushCertificate(@Nullable PushCertificate cert) {
241 		pushCert = cert;
242 		return this;
243 	}
244 
245 	/**
246 	 * Get timestamps that Ketch must block for.
247 	 *
248 	 * @return timestamps that Ketch must block for. These may have been used as
249 	 *         commit times inside the objects involved in the proposal.
250 	 */
251 	public List<ProposedTimestamp> getProposedTimestamps() {
252 		if (timestamps != null) {
253 			return timestamps;
254 		}
255 		return Collections.emptyList();
256 	}
257 
258 	/**
259 	 * Request the proposal to wait for the affected timestamps to resolve.
260 	 *
261 	 * @param ts
262 	 *            a {@link org.eclipse.jgit.util.time.ProposedTimestamp} object.
263 	 * @return {@code this}.
264 	 */
265 	public Proposal addProposedTimestamp(ProposedTimestamp ts) {
266 		if (timestamps == null) {
267 			timestamps = new ArrayList<>(4);
268 		}
269 		timestamps.add(ts);
270 		return this;
271 	}
272 
273 	/**
274 	 * Add a callback to be invoked when the proposal is done.
275 	 * <p>
276 	 * A proposal is done when it has entered either
277 	 * {@link org.eclipse.jgit.internal.ketch.Proposal.State#EXECUTED} or
278 	 * {@link org.eclipse.jgit.internal.ketch.Proposal.State#ABORTED} state. If
279 	 * the proposal is already done {@code callback.run()} is immediately
280 	 * invoked on the caller's thread.
281 	 *
282 	 * @param callback
283 	 *            method to run after the proposal is done. The callback may be
284 	 *            run on a Ketch system thread and should be completed quickly.
285 	 */
286 	public void addListener(Runnable callback) {
287 		boolean runNow = false;
288 		synchronized (state) {
289 			if (state.get().isDone()) {
290 				runNow = true;
291 			} else {
292 				listeners.add(callback);
293 			}
294 		}
295 		if (runNow) {
296 			callback.run();
297 		}
298 	}
299 
300 	/** Set command result as OK. */
301 	void success() {
302 		for (Command c : commands) {
303 			if (c.getResult() == NOT_ATTEMPTED) {
304 				c.setResult(OK);
305 			}
306 		}
307 		notifyState(EXECUTED);
308 	}
309 
310 	/** Mark commands as "transaction aborted". */
311 	void abort() {
312 		Command.abort(commands, null);
313 		notifyState(ABORTED);
314 	}
315 
316 	/**
317 	 * Read the current state of the proposal.
318 	 *
319 	 * @return read the current state of the proposal.
320 	 */
321 	public State getState() {
322 		return state.get();
323 	}
324 
325 	/**
326 	 * Whether the proposal was attempted
327 	 *
328 	 * @return {@code true} if the proposal was attempted. A true value does not
329 	 *         mean consensus was reached, only that the proposal was considered
330 	 *         and will not be making any more progress beyond its current
331 	 *         state.
332 	 */
333 	public boolean isDone() {
334 		return state.get().isDone();
335 	}
336 
337 	/**
338 	 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
339 	 *
340 	 * @throws java.lang.InterruptedException
341 	 *             caller was interrupted before proposal executed.
342 	 */
343 	public void await() throws InterruptedException {
344 		synchronized (state) {
345 			while (!state.get().isDone()) {
346 				state.wait();
347 			}
348 		}
349 	}
350 
351 	/**
352 	 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
353 	 *
354 	 * @param wait
355 	 *            how long to wait.
356 	 * @param unit
357 	 *            unit describing the wait time.
358 	 * @return true if the proposal is done; false if the method timed out.
359 	 * @throws java.lang.InterruptedException
360 	 *             caller was interrupted before proposal executed.
361 	 */
362 	public boolean await(long wait, TimeUnit unit) throws InterruptedException {
363 		synchronized (state) {
364 			if (state.get().isDone()) {
365 				return true;
366 			}
367 			state.wait(unit.toMillis(wait));
368 			return state.get().isDone();
369 		}
370 	}
371 
372 	/**
373 	 * Wait for the proposal to exit a state.
374 	 *
375 	 * @param notIn
376 	 *            state the proposal should not be in to return.
377 	 * @param wait
378 	 *            how long to wait.
379 	 * @param unit
380 	 *            unit describing the wait time.
381 	 * @return true if the proposal exited the state; false on time out.
382 	 * @throws java.lang.InterruptedException
383 	 *             caller was interrupted before proposal executed.
384 	 */
385 	public boolean awaitStateChange(State notIn, long wait, TimeUnit unit)
386 			throws InterruptedException {
387 		synchronized (state) {
388 			if (state.get() != notIn) {
389 				return true;
390 			}
391 			state.wait(unit.toMillis(wait));
392 			return state.get() != notIn;
393 		}
394 	}
395 
396 	void notifyState(State s) {
397 		synchronized (state) {
398 			state.set(s);
399 			state.notifyAll();
400 		}
401 		if (s.isDone()) {
402 			for (Runnable callback : listeners) {
403 				callback.run();
404 			}
405 			listeners.clear();
406 		}
407 	}
408 
409 	/** {@inheritDoc} */
410 	@Override
411 	public String toString() {
412 		StringBuilder s = new StringBuilder();
413 		s.append("Ketch Proposal {\n"); //$NON-NLS-1$
414 		s.append("  ").append(state.get()).append('\n'); //$NON-NLS-1$
415 		if (author != null) {
416 			s.append("  author ").append(author).append('\n'); //$NON-NLS-1$
417 		}
418 		if (message != null) {
419 			s.append("  message ").append(message).append('\n'); //$NON-NLS-1$
420 		}
421 		for (Command c : commands) {
422 			s.append("  "); //$NON-NLS-1$
423 			format(s, c.getOldRef(), "CREATE"); //$NON-NLS-1$
424 			s.append(' ');
425 			format(s, c.getNewRef(), "DELETE"); //$NON-NLS-1$
426 			s.append(' ').append(c.getRefName());
427 			if (c.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) {
428 				s.append(' ').append(c.getResult()); // $NON-NLS-1$
429 			}
430 			s.append('\n');
431 		}
432 		s.append('}');
433 		return s.toString();
434 	}
435 
436 	private static void format(StringBuilder s, @Nullable Ref r, String n) {
437 		if (r == null) {
438 			s.append(n);
439 		} else if (r.isSymbolic()) {
440 			s.append(r.getTarget().getName());
441 		} else {
442 			ObjectId id = r.getObjectId();
443 			if (id != null) {
444 				s.append(id.abbreviate(8).name());
445 			}
446 		}
447 	}
448 }