View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.ketch;
12  
13  import static org.eclipse.jgit.internal.ketch.Proposal.State.RUNNING;
14  
15  import java.io.IOException;
16  import java.time.Duration;
17  import java.util.ArrayList;
18  import java.util.Collections;
19  import java.util.HashMap;
20  import java.util.HashSet;
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Set;
24  import java.util.concurrent.TimeoutException;
25  import java.util.stream.Collectors;
26  
27  import org.eclipse.jgit.annotations.Nullable;
28  import org.eclipse.jgit.internal.storage.reftree.Command;
29  import org.eclipse.jgit.internal.storage.reftree.RefTree;
30  import org.eclipse.jgit.lib.CommitBuilder;
31  import org.eclipse.jgit.lib.ObjectId;
32  import org.eclipse.jgit.lib.ObjectInserter;
33  import org.eclipse.jgit.lib.PersonIdent;
34  import org.eclipse.jgit.lib.Ref;
35  import org.eclipse.jgit.lib.Repository;
36  import org.eclipse.jgit.revwalk.RevCommit;
37  import org.eclipse.jgit.revwalk.RevWalk;
38  import org.eclipse.jgit.transport.ReceiveCommand;
39  import org.eclipse.jgit.util.time.ProposedTimestamp;
40  
41  /** A {@link Round} that aggregates and sends user {@link Proposal}s. */
42  class ProposalRound extends Round {
43  	private final List<Proposal> todo;
44  	private RefTree queuedTree;
45  
46  	ProposalRound(KetchLeader leader, LogIndex head, List<Proposal> todo,
47  			@Nullable RefTree tree) {
48  		super(leader, head);
49  		this.todo = todo;
50  
51  		if (tree != null && canCombine(todo)) {
52  			this.queuedTree = tree;
53  		} else {
54  			leader.roundHoldsReferenceToRefTree = false;
55  		}
56  	}
57  
58  	private static boolean canCombine(List<Proposal> todo) {
59  		Proposal first = todo.get(0);
60  		for (int i = 1; i < todo.size(); i++) {
61  			if (!canCombine(first, todo.get(i))) {
62  				return false;
63  			}
64  		}
65  		return true;
66  	}
67  
68  	private static boolean canCombine(Proposalref="../../../../../org/eclipse/jgit/internal/ketch/Proposal.html#Proposal">Proposal a, Proposal b) {
69  		String aMsg = nullToEmpty(a.getMessage());
70  		String bMsg = nullToEmpty(b.getMessage());
71  		return aMsg.equals(bMsg) && canCombine(a.getAuthor(), b.getAuthor());
72  	}
73  
74  	private static String nullToEmpty(@Nullable String str) {
75  		return str != null ? str : ""; //$NON-NLS-1$
76  	}
77  
78  	private static boolean canCombine(@Nullable PersonIdent a,
79  			@Nullable PersonIdent b) {
80  		if (a != null && b != null) {
81  			// Same name and email address. Combine timestamp as the two
82  			// proposals are running concurrently and appear together or
83  			// not at all from the point of view of an outside reader.
84  			return a.getName().equals(b.getName())
85  					&& a.getEmailAddress().equals(b.getEmailAddress());
86  		}
87  
88  		// If a and b are null, both will be the system identity.
89  		return a == null && b == null;
90  	}
91  
92  	@Override
93  	void start() throws IOException {
94  		for (Proposal p : todo) {
95  			p.notifyState(RUNNING);
96  		}
97  		try {
98  			ObjectId id;
99  			try (Repository git = leader.openRepository();
100 					ProposedTimestamp ts = getSystem().getClock().propose()) {
101 				id = insertProposals(git, ts);
102 				blockUntil(ts);
103 			}
104 			runAsync(id);
105 		} catch (NoOp e) {
106 			for (Proposal p : todo) {
107 				p.success();
108 			}
109 			leader.lock.lock();
110 			try {
111 				leader.nextRound();
112 			} finally {
113 				leader.lock.unlock();
114 			}
115 		} catch (IOException e) {
116 			abort();
117 			throw e;
118 		}
119 	}
120 
121 	private ObjectId insertProposals(Repository git, ProposedTimestamp ts)
122 			throws IOException, NoOp {
123 		ObjectId id;
124 		try (ObjectInserter inserter = git.newObjectInserter()) {
125 			// TODO(sop) Process signed push certificates.
126 
127 			if (queuedTree != null) {
128 				id = insertSingleProposal(git, ts, inserter);
129 			} else {
130 				id = insertMultiProposal(git, ts, inserter);
131 			}
132 
133 			stageCommands = makeStageList(git, inserter);
134 			inserter.flush();
135 		}
136 		return id;
137 	}
138 
139 	private ObjectId insertSingleProposal(Repository git, ProposedTimestamp ts,
140 			ObjectInserter inserter) throws IOException, NoOp {
141 		// Fast path: tree is passed in with all proposals applied.
142 		ObjectId treeId = queuedTree.writeTree(inserter);
143 		queuedTree = null;
144 		leader.roundHoldsReferenceToRefTree = false;
145 
146 		if (!ObjectId.zeroId().equals(acceptedOldIndex)) {
147 			try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
148 				RevCommit c = rw.parseCommit(acceptedOldIndex);
149 				if (treeId.equals(c.getTree())) {
150 					throw new NoOp();
151 				}
152 			}
153 		}
154 
155 		Proposal p = todo.get(0);
156 		CommitBuilder b = new CommitBuilder();
157 		b.setTreeId(treeId);
158 		if (!ObjectId.zeroId().equals(acceptedOldIndex)) {
159 			b.setParentId(acceptedOldIndex);
160 		}
161 		b.setCommitter(leader.getSystem().newCommitter(ts));
162 		b.setAuthor(p.getAuthor() != null ? p.getAuthor() : b.getCommitter());
163 		b.setMessage(message(p));
164 		return inserter.insert(b);
165 	}
166 
167 	private ObjectId insertMultiProposal(Repository git, ProposedTimestamp ts,
168 			ObjectInserter inserter) throws IOException, NoOp {
169 		// The tree was not passed in, or there are multiple proposals
170 		// each needing their own commit. Reset the tree and replay each
171 		// proposal in order as individual commits.
172 		ObjectId lastIndex = acceptedOldIndex;
173 		ObjectId oldTreeId;
174 		RefTree tree;
175 		if (ObjectId.zeroId().equals(lastIndex)) {
176 			oldTreeId = ObjectId.zeroId();
177 			tree = RefTree.newEmptyTree();
178 		} else {
179 			try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
180 				RevCommit c = rw.parseCommit(lastIndex);
181 				oldTreeId = c.getTree();
182 				tree = RefTree.read(rw.getObjectReader(), c.getTree());
183 			}
184 		}
185 
186 		PersonIdent committer = leader.getSystem().newCommitter(ts);
187 		for (Proposal p : todo) {
188 			if (!tree.apply(p.getCommands())) {
189 				// This should not occur, previously during queuing the
190 				// commands were successfully applied to the pending tree.
191 				// Abort the entire round.
192 				throw new IOException(
193 						KetchText.get().queuedProposalFailedToApply);
194 			}
195 
196 			ObjectId treeId = tree.writeTree(inserter);
197 			if (treeId.equals(oldTreeId)) {
198 				continue;
199 			}
200 
201 			CommitBuilder b = new CommitBuilder();
202 			b.setTreeId(treeId);
203 			if (!ObjectId.zeroId().equals(lastIndex)) {
204 				b.setParentId(lastIndex);
205 			}
206 			b.setAuthor(p.getAuthor() != null ? p.getAuthor() : committer);
207 			b.setCommitter(committer);
208 			b.setMessage(message(p));
209 			lastIndex = inserter.insert(b);
210 		}
211 		if (lastIndex.equals(acceptedOldIndex)) {
212 			throw new NoOp();
213 		}
214 		return lastIndex;
215 	}
216 
217 	private String message(Proposal p) {
218 		StringBuilder m = new StringBuilder();
219 		String msg = p.getMessage();
220 		if (msg != null && !msg.isEmpty()) {
221 			m.append(msg);
222 			while (m.length() < 2 || m.charAt(m.length() - 2) != '\n'
223 					|| m.charAt(m.length() - 1) != '\n') {
224 				m.append('\n');
225 			}
226 		}
227 		m.append(KetchConstants.TERM.getName())
228 				.append(": ") //$NON-NLS-1$
229 				.append(leader.getTerm());
230 		return m.toString();
231 	}
232 
233 	void abort() {
234 		for (Proposal p : todo) {
235 			p.abort();
236 		}
237 	}
238 
239 	@Override
240 	void success() {
241 		for (Proposal p : todo) {
242 			p.success();
243 		}
244 	}
245 
246 	private List<ReceiveCommand> makeStageList(Repository git,
247 			ObjectInserter inserter) throws IOException {
248 		// For each branch, collapse consecutive updates to only most recent,
249 		// avoiding sending multiple objects in a rapid fast-forward chain, or
250 		// rewritten content.
251 		Map<String, ObjectId> byRef = new HashMap<>();
252 		for (Proposal p : todo) {
253 			for (Command c : p.getCommands()) {
254 				Ref n = c.getNewRef();
255 				if (n != null && !n.isSymbolic()) {
256 					byRef.put(n.getName(), n.getObjectId());
257 				}
258 			}
259 		}
260 		if (byRef.isEmpty()) {
261 			return Collections.emptyList();
262 		}
263 
264 		Set<ObjectId> newObjs = new HashSet<>(byRef.values());
265 		StageBuilder b = new StageBuilder(
266 				leader.getSystem().getTxnStage(),
267 				acceptedNewIndex);
268 		return b.makeStageList(newObjs, git, inserter);
269 	}
270 
271 	private void blockUntil(ProposedTimestamp ts)
272 			throws TimeIsUncertainException {
273 		List<ProposedTimestamp> times = todo.stream()
274 				.flatMap(p -> p.getProposedTimestamps().stream())
275 				.collect(Collectors.toCollection(ArrayList::new));
276 		times.add(ts);
277 
278 		try {
279 			Duration maxWait = getSystem().getMaxWaitForMonotonicClock();
280 			ProposedTimestamp.blockUntil(times, maxWait);
281 		} catch (InterruptedException | TimeoutException e) {
282 			throw new TimeIsUncertainException(e);
283 		}
284 	}
285 
286 	private static class NoOp extends Exception {
287 		private static final long serialVersionUID = 1L;
288 	}
289 }