View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static org.eclipse.jgit.internal.ketch.Proposal.State.RUNNING;
47  
48  import java.io.IOException;
49  import java.time.Duration;
50  import java.util.ArrayList;
51  import java.util.Collections;
52  import java.util.HashMap;
53  import java.util.HashSet;
54  import java.util.List;
55  import java.util.Map;
56  import java.util.Set;
57  import java.util.concurrent.TimeoutException;
58  import java.util.stream.Collectors;
59  
60  import org.eclipse.jgit.annotations.Nullable;
61  import org.eclipse.jgit.internal.storage.reftree.Command;
62  import org.eclipse.jgit.internal.storage.reftree.RefTree;
63  import org.eclipse.jgit.lib.CommitBuilder;
64  import org.eclipse.jgit.lib.ObjectId;
65  import org.eclipse.jgit.lib.ObjectInserter;
66  import org.eclipse.jgit.lib.PersonIdent;
67  import org.eclipse.jgit.lib.Ref;
68  import org.eclipse.jgit.lib.Repository;
69  import org.eclipse.jgit.revwalk.RevCommit;
70  import org.eclipse.jgit.revwalk.RevWalk;
71  import org.eclipse.jgit.transport.ReceiveCommand;
72  import org.eclipse.jgit.util.time.ProposedTimestamp;
73  
74  /** A {@link Round} that aggregates and sends user {@link Proposal}s. */
75  class ProposalRound extends Round {
76  	private final List<Proposal> todo;
77  	private RefTree queuedTree;
78  
79  	ProposalRound(KetchLeader leader, LogIndex head, List<Proposal> todo,
80  			@Nullable RefTree tree) {
81  		super(leader, head);
82  		this.todo = todo;
83  
84  		if (tree != null && canCombine(todo)) {
85  			this.queuedTree = tree;
86  		} else {
87  			leader.roundHoldsReferenceToRefTree = false;
88  		}
89  	}
90  
91  	private static boolean canCombine(List<Proposal> todo) {
92  		Proposal first = todo.get(0);
93  		for (int i = 1; i < todo.size(); i++) {
94  			if (!canCombine(first, todo.get(i))) {
95  				return false;
96  			}
97  		}
98  		return true;
99  	}
100 
101 	private static boolean canCombine(Proposal a, Proposal b) {
102 		String aMsg = nullToEmpty(a.getMessage());
103 		String bMsg = nullToEmpty(b.getMessage());
104 		return aMsg.equals(bMsg) && canCombine(a.getAuthor(), b.getAuthor());
105 	}
106 
107 	private static String nullToEmpty(@Nullable String str) {
108 		return str != null ? str : ""; //$NON-NLS-1$
109 	}
110 
111 	private static boolean canCombine(@Nullable PersonIdent a,
112 			@Nullable PersonIdent b) {
113 		if (a != null && b != null) {
114 			// Same name and email address. Combine timestamp as the two
115 			// proposals are running concurrently and appear together or
116 			// not at all from the point of view of an outside reader.
117 			return a.getName().equals(b.getName())
118 					&& a.getEmailAddress().equals(b.getEmailAddress());
119 		}
120 
121 		// If a and b are null, both will be the system identity.
122 		return a == null && b == null;
123 	}
124 
125 	@Override
126 	void start() throws IOException {
127 		for (Proposal p : todo) {
128 			p.notifyState(RUNNING);
129 		}
130 		try {
131 			ObjectId id;
132 			try (Repository git = leader.openRepository();
133 					ProposedTimestamp ts = getSystem().getClock().propose()) {
134 				id = insertProposals(git, ts);
135 				blockUntil(ts);
136 			}
137 			runAsync(id);
138 		} catch (NoOp e) {
139 			for (Proposal p : todo) {
140 				p.success();
141 			}
142 			leader.lock.lock();
143 			try {
144 				leader.nextRound();
145 			} finally {
146 				leader.lock.unlock();
147 			}
148 		} catch (IOException e) {
149 			abort();
150 			throw e;
151 		}
152 	}
153 
154 	private ObjectId insertProposals(Repository git, ProposedTimestamp ts)
155 			throws IOException, NoOp {
156 		ObjectId id;
157 		try (ObjectInserter inserter = git.newObjectInserter()) {
158 			// TODO(sop) Process signed push certificates.
159 
160 			if (queuedTree != null) {
161 				id = insertSingleProposal(git, ts, inserter);
162 			} else {
163 				id = insertMultiProposal(git, ts, inserter);
164 			}
165 
166 			stageCommands = makeStageList(git, inserter);
167 			inserter.flush();
168 		}
169 		return id;
170 	}
171 
172 	private ObjectId insertSingleProposal(Repository git, ProposedTimestamp ts,
173 			ObjectInserter inserter) throws IOException, NoOp {
174 		// Fast path: tree is passed in with all proposals applied.
175 		ObjectId treeId = queuedTree.writeTree(inserter);
176 		queuedTree = null;
177 		leader.roundHoldsReferenceToRefTree = false;
178 
179 		if (!ObjectId.zeroId().equals(acceptedOldIndex)) {
180 			try (RevWalk rw = new RevWalk(git)) {
181 				RevCommit c = rw.parseCommit(acceptedOldIndex);
182 				if (treeId.equals(c.getTree())) {
183 					throw new NoOp();
184 				}
185 			}
186 		}
187 
188 		Proposal p = todo.get(0);
189 		CommitBuilder b = new CommitBuilder();
190 		b.setTreeId(treeId);
191 		if (!ObjectId.zeroId().equals(acceptedOldIndex)) {
192 			b.setParentId(acceptedOldIndex);
193 		}
194 		b.setCommitter(leader.getSystem().newCommitter(ts));
195 		b.setAuthor(p.getAuthor() != null ? p.getAuthor() : b.getCommitter());
196 		b.setMessage(message(p));
197 		return inserter.insert(b);
198 	}
199 
200 	private ObjectId insertMultiProposal(Repository git, ProposedTimestamp ts,
201 			ObjectInserter inserter) throws IOException, NoOp {
202 		// The tree was not passed in, or there are multiple proposals
203 		// each needing their own commit. Reset the tree and replay each
204 		// proposal in order as individual commits.
205 		ObjectId lastIndex = acceptedOldIndex;
206 		ObjectId oldTreeId;
207 		RefTree tree;
208 		if (ObjectId.zeroId().equals(lastIndex)) {
209 			oldTreeId = ObjectId.zeroId();
210 			tree = RefTree.newEmptyTree();
211 		} else {
212 			try (RevWalk rw = new RevWalk(git)) {
213 				RevCommit c = rw.parseCommit(lastIndex);
214 				oldTreeId = c.getTree();
215 				tree = RefTree.read(rw.getObjectReader(), c.getTree());
216 			}
217 		}
218 
219 		PersonIdent committer = leader.getSystem().newCommitter(ts);
220 		for (Proposal p : todo) {
221 			if (!tree.apply(p.getCommands())) {
222 				// This should not occur, previously during queuing the
223 				// commands were successfully applied to the pending tree.
224 				// Abort the entire round.
225 				throw new IOException(
226 						KetchText.get().queuedProposalFailedToApply);
227 			}
228 
229 			ObjectId treeId = tree.writeTree(inserter);
230 			if (treeId.equals(oldTreeId)) {
231 				continue;
232 			}
233 
234 			CommitBuilder b = new CommitBuilder();
235 			b.setTreeId(treeId);
236 			if (!ObjectId.zeroId().equals(lastIndex)) {
237 				b.setParentId(lastIndex);
238 			}
239 			b.setAuthor(p.getAuthor() != null ? p.getAuthor() : committer);
240 			b.setCommitter(committer);
241 			b.setMessage(message(p));
242 			lastIndex = inserter.insert(b);
243 		}
244 		if (lastIndex.equals(acceptedOldIndex)) {
245 			throw new NoOp();
246 		}
247 		return lastIndex;
248 	}
249 
250 	private String message(Proposal p) {
251 		StringBuilder m = new StringBuilder();
252 		String msg = p.getMessage();
253 		if (msg != null && !msg.isEmpty()) {
254 			m.append(msg);
255 			while (m.length() < 2 || m.charAt(m.length() - 2) != '\n'
256 					|| m.charAt(m.length() - 1) != '\n') {
257 				m.append('\n');
258 			}
259 		}
260 		m.append(KetchConstants.TERM.getName())
261 				.append(": ") //$NON-NLS-1$
262 				.append(leader.getTerm());
263 		return m.toString();
264 	}
265 
266 	void abort() {
267 		for (Proposal p : todo) {
268 			p.abort();
269 		}
270 	}
271 
272 	@Override
273 	void success() {
274 		for (Proposal p : todo) {
275 			p.success();
276 		}
277 	}
278 
279 	private List<ReceiveCommand> makeStageList(Repository git,
280 			ObjectInserter inserter) throws IOException {
281 		// For each branch, collapse consecutive updates to only most recent,
282 		// avoiding sending multiple objects in a rapid fast-forward chain, or
283 		// rewritten content.
284 		Map<String, ObjectId> byRef = new HashMap<>();
285 		for (Proposal p : todo) {
286 			for (Command c : p.getCommands()) {
287 				Ref n = c.getNewRef();
288 				if (n != null && !n.isSymbolic()) {
289 					byRef.put(n.getName(), n.getObjectId());
290 				}
291 			}
292 		}
293 		if (byRef.isEmpty()) {
294 			return Collections.emptyList();
295 		}
296 
297 		Set<ObjectId> newObjs = new HashSet<>(byRef.values());
298 		StageBuilder b = new StageBuilder(
299 				leader.getSystem().getTxnStage(),
300 				acceptedNewIndex);
301 		return b.makeStageList(newObjs, git, inserter);
302 	}
303 
304 	private void blockUntil(ProposedTimestamp ts)
305 			throws TimeIsUncertainException {
306 		List<ProposedTimestamp> times = todo.stream()
307 				.flatMap(p -> p.getProposedTimestamps().stream())
308 				.collect(Collectors.toCollection(ArrayList::new));
309 		times.add(ts);
310 
311 		try {
312 			Duration maxWait = getSystem().getMaxWaitForMonotonicClock();
313 			ProposedTimestamp.blockUntil(times, maxWait);
314 		} catch (InterruptedException | TimeoutException e) {
315 			throw new TimeIsUncertainException(e);
316 		}
317 	}
318 
319 	private static class NoOp extends Exception {
320 		private static final long serialVersionUID = 1L;
321 	}
322 }