View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.ketch;
12  
13  import static org.eclipse.jgit.internal.ketch.Proposal.State.ABORTED;
14  import static org.eclipse.jgit.internal.ketch.Proposal.State.EXECUTED;
15  import static org.eclipse.jgit.internal.ketch.Proposal.State.NEW;
16  import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
17  import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
18  
19  import java.io.IOException;
20  import java.util.ArrayList;
21  import java.util.Collection;
22  import java.util.Collections;
23  import java.util.List;
24  import java.util.concurrent.CopyOnWriteArrayList;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import org.eclipse.jgit.annotations.Nullable;
29  import org.eclipse.jgit.errors.MissingObjectException;
30  import org.eclipse.jgit.internal.storage.reftree.Command;
31  import org.eclipse.jgit.lib.ObjectId;
32  import org.eclipse.jgit.lib.PersonIdent;
33  import org.eclipse.jgit.lib.Ref;
34  import org.eclipse.jgit.revwalk.RevWalk;
35  import org.eclipse.jgit.transport.PushCertificate;
36  import org.eclipse.jgit.transport.ReceiveCommand;
37  import org.eclipse.jgit.util.time.ProposedTimestamp;
38  
39  /**
40   * A proposal to be applied in a Ketch system.
41   * <p>
42   * Pushing to a Ketch leader results in the leader making a proposal. The
43   * proposal includes the list of reference updates. The leader attempts to send
44   * the proposal to a quorum of replicas by pushing the proposal to a "staging"
45   * area under the {@code refs/txn/stage/} namespace. If the proposal succeeds
46   * then the changes are durable and the leader can commit the proposal.
47   * <p>
48   * Proposals are executed by
49   * {@link org.eclipse.jgit.internal.ketch.KetchLeader#queueProposal(Proposal)},
50   * which runs them asynchronously in the background. Proposals are thread-safe
51   * futures allowing callers to {@link #await()} for results or be notified by
52   * callback using {@link #addListener(Runnable)}.
53   */
54  public class Proposal {
55  	/** Current state of the proposal. */
56  	public enum State {
57  		/** Proposal has not yet been given to a {@link KetchLeader}. */
58  		NEW(false),
59  
60  		/**
61  		 * Proposal was validated and has entered the queue, but a round
62  		 * containing this proposal has not started yet.
63  		 */
64  		QUEUED(false),
65  
66  		/** Round containing the proposal has begun and is in progress. */
67  		RUNNING(false),
68  
69  		/**
70  		 * Proposal was executed through a round. Individual results from
71  		 * {@link Proposal#getCommands()}, {@link Command#getResult()} explain
72  		 * the success or failure outcome.
73  		 */
74  		EXECUTED(true),
75  
76  		/** Proposal was aborted and did not reach consensus. */
77  		ABORTED(true);
78  
79  		private final boolean done;
80  
81  		private State(boolean done) {
82  			this.done = done;
83  		}
84  
85  		/** @return true if this is a terminal state. */
86  		public boolean isDone() {
87  			return done;
88  		}
89  	}
90  
91  	private final List<Command> commands;
92  	private PersonIdent author;
93  	private String message;
94  	private PushCertificate pushCert;
95  
96  	private List<ProposedTimestamp> timestamps;
97  	private final List<Runnable> listeners = new CopyOnWriteArrayList<>();
98  	private final AtomicReference<State> state = new AtomicReference<>(NEW);
99  
100 	/**
101 	 * Create a proposal from a list of Ketch commands.
102 	 *
103 	 * @param cmds
104 	 *            prepared list of commands.
105 	 */
106 	public Proposal(List<Command> cmds) {
107 		commands = Collections.unmodifiableList(new ArrayList<>(cmds));
108 	}
109 
110 	/**
111 	 * Create a proposal from a collection of received commands.
112 	 *
113 	 * @param rw
114 	 *            walker to assist in preparing commands.
115 	 * @param cmds
116 	 *            list of pending commands.
117 	 * @throws org.eclipse.jgit.errors.MissingObjectException
118 	 *             newId of a command is not found locally.
119 	 * @throws java.io.IOException
120 	 *             local objects cannot be accessed.
121 	 */
122 	public Proposal(RevWalk rw, Collection<ReceiveCommand> cmds)
123 			throws MissingObjectException, IOException {
124 		commands = asCommandList(rw, cmds);
125 	}
126 
127 	private static List<Command> asCommandList(RevWalk rw,
128 			Collection<ReceiveCommand> cmds)
129 					throws MissingObjectException, IOException {
130 		List<Command> commands = new ArrayList<>(cmds.size());
131 		for (ReceiveCommand cmd : cmds) {
132 			commands.add(new Command(rw, cmd));
133 		}
134 		return Collections.unmodifiableList(commands);
135 	}
136 
137 	/**
138 	 * Get commands from this proposal.
139 	 *
140 	 * @return commands from this proposal.
141 	 */
142 	public Collection<Command> getCommands() {
143 		return commands;
144 	}
145 
146 	/**
147 	 * Get optional author of the proposal.
148 	 *
149 	 * @return optional author of the proposal.
150 	 */
151 	@Nullable
152 	public PersonIdent getAuthor() {
153 		return author;
154 	}
155 
156 	/**
157 	 * Set the author for the proposal.
158 	 *
159 	 * @param who
160 	 *            optional identity of the author of the proposal.
161 	 * @return {@code this}
162 	 */
163 	public Proposal setAuthor(@Nullable PersonIdent who) {
164 		author = who;
165 		return this;
166 	}
167 
168 	/**
169 	 * Get optional message for the commit log of the RefTree.
170 	 *
171 	 * @return optional message for the commit log of the RefTree.
172 	 */
173 	@Nullable
174 	public String getMessage() {
175 		return message;
176 	}
177 
178 	/**
179 	 * Set the message to appear in the commit log of the RefTree.
180 	 *
181 	 * @param msg
182 	 *            message text for the commit.
183 	 * @return {@code this}
184 	 */
185 	public Proposal setMessage(@Nullable String msg) {
186 		message = msg != null && !msg.isEmpty() ? msg : null;
187 		return this;
188 	}
189 
190 	/**
191 	 * Get optional certificate signing the references.
192 	 *
193 	 * @return optional certificate signing the references.
194 	 */
195 	@Nullable
196 	public PushCertificate getPushCertificate() {
197 		return pushCert;
198 	}
199 
200 	/**
201 	 * Set the push certificate signing the references.
202 	 *
203 	 * @param cert
204 	 *            certificate, may be null.
205 	 * @return {@code this}
206 	 */
207 	public Proposal setPushCertificate(@Nullable PushCertificate cert) {
208 		pushCert = cert;
209 		return this;
210 	}
211 
212 	/**
213 	 * Get timestamps that Ketch must block for.
214 	 *
215 	 * @return timestamps that Ketch must block for. These may have been used as
216 	 *         commit times inside the objects involved in the proposal.
217 	 */
218 	public List<ProposedTimestamp> getProposedTimestamps() {
219 		if (timestamps != null) {
220 			return timestamps;
221 		}
222 		return Collections.emptyList();
223 	}
224 
225 	/**
226 	 * Request the proposal to wait for the affected timestamps to resolve.
227 	 *
228 	 * @param ts
229 	 *            a {@link org.eclipse.jgit.util.time.ProposedTimestamp} object.
230 	 * @return {@code this}.
231 	 */
232 	public Proposal addProposedTimestamp(ProposedTimestamp ts) {
233 		if (timestamps == null) {
234 			timestamps = new ArrayList<>(4);
235 		}
236 		timestamps.add(ts);
237 		return this;
238 	}
239 
240 	/**
241 	 * Add a callback to be invoked when the proposal is done.
242 	 * <p>
243 	 * A proposal is done when it has entered either
244 	 * {@link org.eclipse.jgit.internal.ketch.Proposal.State#EXECUTED} or
245 	 * {@link org.eclipse.jgit.internal.ketch.Proposal.State#ABORTED} state. If
246 	 * the proposal is already done {@code callback.run()} is immediately
247 	 * invoked on the caller's thread.
248 	 *
249 	 * @param callback
250 	 *            method to run after the proposal is done. The callback may be
251 	 *            run on a Ketch system thread and should be completed quickly.
252 	 */
253 	public void addListener(Runnable callback) {
254 		boolean runNow = false;
255 		synchronized (state) {
256 			if (state.get().isDone()) {
257 				runNow = true;
258 			} else {
259 				listeners.add(callback);
260 			}
261 		}
262 		if (runNow) {
263 			callback.run();
264 		}
265 	}
266 
267 	/** Set command result as OK. */
268 	void success() {
269 		for (Command c : commands) {
270 			if (c.getResult() == NOT_ATTEMPTED) {
271 				c.setResult(OK);
272 			}
273 		}
274 		notifyState(EXECUTED);
275 	}
276 
277 	/** Mark commands as "transaction aborted". */
278 	void abort() {
279 		Command.abort(commands, null);
280 		notifyState(ABORTED);
281 	}
282 
283 	/**
284 	 * Read the current state of the proposal.
285 	 *
286 	 * @return read the current state of the proposal.
287 	 */
288 	public State getState() {
289 		return state.get();
290 	}
291 
292 	/**
293 	 * Whether the proposal was attempted
294 	 *
295 	 * @return {@code true} if the proposal was attempted. A true value does not
296 	 *         mean consensus was reached, only that the proposal was considered
297 	 *         and will not be making any more progress beyond its current
298 	 *         state.
299 	 */
300 	public boolean isDone() {
301 		return state.get().isDone();
302 	}
303 
304 	/**
305 	 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
306 	 *
307 	 * @throws java.lang.InterruptedException
308 	 *             caller was interrupted before proposal executed.
309 	 */
310 	public void await() throws InterruptedException {
311 		synchronized (state) {
312 			while (!state.get().isDone()) {
313 				state.wait();
314 			}
315 		}
316 	}
317 
318 	/**
319 	 * Wait for the proposal to be attempted and {@link #isDone()} to be true.
320 	 *
321 	 * @param wait
322 	 *            how long to wait.
323 	 * @param unit
324 	 *            unit describing the wait time.
325 	 * @return true if the proposal is done; false if the method timed out.
326 	 * @throws java.lang.InterruptedException
327 	 *             caller was interrupted before proposal executed.
328 	 */
329 	public boolean await(long wait, TimeUnit unit) throws InterruptedException {
330 		synchronized (state) {
331 			if (state.get().isDone()) {
332 				return true;
333 			}
334 			state.wait(unit.toMillis(wait));
335 			return state.get().isDone();
336 		}
337 	}
338 
339 	/**
340 	 * Wait for the proposal to exit a state.
341 	 *
342 	 * @param notIn
343 	 *            state the proposal should not be in to return.
344 	 * @param wait
345 	 *            how long to wait.
346 	 * @param unit
347 	 *            unit describing the wait time.
348 	 * @return true if the proposal exited the state; false on time out.
349 	 * @throws java.lang.InterruptedException
350 	 *             caller was interrupted before proposal executed.
351 	 */
352 	public boolean awaitStateChange(State notIn, long wait, TimeUnit unit)
353 			throws InterruptedException {
354 		synchronized (state) {
355 			if (state.get() != notIn) {
356 				return true;
357 			}
358 			state.wait(unit.toMillis(wait));
359 			return state.get() != notIn;
360 		}
361 	}
362 
363 	void notifyState(State s) {
364 		synchronized (state) {
365 			state.set(s);
366 			state.notifyAll();
367 		}
368 		if (s.isDone()) {
369 			for (Runnable callback : listeners) {
370 				callback.run();
371 			}
372 			listeners.clear();
373 		}
374 	}
375 
376 	/** {@inheritDoc} */
377 	@Override
378 	public String toString() {
379 		StringBuilder s = new StringBuilder();
380 		s.append("Ketch Proposal {\n"); //$NON-NLS-1$
381 		s.append("  ").append(state.get()).append('\n'); //$NON-NLS-1$
382 		if (author != null) {
383 			s.append("  author ").append(author).append('\n'); //$NON-NLS-1$
384 		}
385 		if (message != null) {
386 			s.append("  message ").append(message).append('\n'); //$NON-NLS-1$
387 		}
388 		for (Command c : commands) {
389 			s.append("  "); //$NON-NLS-1$
390 			format(s, c.getOldRef(), "CREATE"); //$NON-NLS-1$
391 			s.append(' ');
392 			format(s, c.getNewRef(), "DELETE"); //$NON-NLS-1$
393 			s.append(' ').append(c.getRefName());
394 			if (c.getResult() != ReceiveCommand.Result.NOT_ATTEMPTED) {
395 				s.append(' ').append(c.getResult()); // $NON-NLS-1$
396 			}
397 			s.append('\n');
398 		}
399 		s.append('}');
400 		return s.toString();
401 	}
402 
403 	private static void format(StringBuilder s, @Nullable Ref r, String n) {
404 		if (r == null) {
405 			s.append(n);
406 		} else if (r.isSymbolic()) {
407 			s.append(r.getTarget().getName());
408 		} else {
409 			ObjectId id = r.getObjectId();
410 			if (id != null) {
411 				s.append(id.abbreviate(8).name());
412 			}
413 		}
414 	}
415 }