1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.ketch;
12
13 import static org.eclipse.jgit.internal.ketch.Proposal.State.RUNNING;
14
15 import java.io.IOException;
16 import java.time.Duration;
17 import java.util.ArrayList;
18 import java.util.Collections;
19 import java.util.HashMap;
20 import java.util.HashSet;
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Set;
24 import java.util.concurrent.TimeoutException;
25 import java.util.stream.Collectors;
26
27 import org.eclipse.jgit.annotations.Nullable;
28 import org.eclipse.jgit.internal.storage.reftree.Command;
29 import org.eclipse.jgit.internal.storage.reftree.RefTree;
30 import org.eclipse.jgit.lib.CommitBuilder;
31 import org.eclipse.jgit.lib.ObjectId;
32 import org.eclipse.jgit.lib.ObjectInserter;
33 import org.eclipse.jgit.lib.PersonIdent;
34 import org.eclipse.jgit.lib.Ref;
35 import org.eclipse.jgit.lib.Repository;
36 import org.eclipse.jgit.revwalk.RevCommit;
37 import org.eclipse.jgit.revwalk.RevWalk;
38 import org.eclipse.jgit.transport.ReceiveCommand;
39 import org.eclipse.jgit.util.time.ProposedTimestamp;
40
41
42 class ProposalRound extends Round {
43 private final List<Proposal> todo;
44 private RefTree queuedTree;
45
46 ProposalRound(KetchLeader leader, LogIndex head, List<Proposal> todo,
47 @Nullable RefTree tree) {
48 super(leader, head);
49 this.todo = todo;
50
51 if (tree != null && canCombine(todo)) {
52 this.queuedTree = tree;
53 } else {
54 leader.roundHoldsReferenceToRefTree = false;
55 }
56 }
57
58 private static boolean canCombine(List<Proposal> todo) {
59 Proposal first = todo.get(0);
60 for (int i = 1; i < todo.size(); i++) {
61 if (!canCombine(first, todo.get(i))) {
62 return false;
63 }
64 }
65 return true;
66 }
67
68 private static boolean canCombine(Proposalref="../../../../../org/eclipse/jgit/internal/ketch/Proposal.html#Proposal">Proposal a, Proposal b) {
69 String aMsg = nullToEmpty(a.getMessage());
70 String bMsg = nullToEmpty(b.getMessage());
71 return aMsg.equals(bMsg) && canCombine(a.getAuthor(), b.getAuthor());
72 }
73
74 private static String nullToEmpty(@Nullable String str) {
75 return str != null ? str : "";
76 }
77
78 private static boolean canCombine(@Nullable PersonIdent a,
79 @Nullable PersonIdent b) {
80 if (a != null && b != null) {
81
82
83
84 return a.getName().equals(b.getName())
85 && a.getEmailAddress().equals(b.getEmailAddress());
86 }
87
88
89 return a == null && b == null;
90 }
91
92 @Override
93 void start() throws IOException {
94 for (Proposal p : todo) {
95 p.notifyState(RUNNING);
96 }
97 try {
98 ObjectId id;
99 try (Repository git = leader.openRepository();
100 ProposedTimestamp ts = getSystem().getClock().propose()) {
101 id = insertProposals(git, ts);
102 blockUntil(ts);
103 }
104 runAsync(id);
105 } catch (NoOp e) {
106 for (Proposal p : todo) {
107 p.success();
108 }
109 leader.lock.lock();
110 try {
111 leader.nextRound();
112 } finally {
113 leader.lock.unlock();
114 }
115 } catch (IOException e) {
116 abort();
117 throw e;
118 }
119 }
120
121 private ObjectId insertProposals(Repository git, ProposedTimestamp ts)
122 throws IOException, NoOp {
123 ObjectId id;
124 try (ObjectInserter inserter = git.newObjectInserter()) {
125
126
127 if (queuedTree != null) {
128 id = insertSingleProposal(git, ts, inserter);
129 } else {
130 id = insertMultiProposal(git, ts, inserter);
131 }
132
133 stageCommands = makeStageList(git, inserter);
134 inserter.flush();
135 }
136 return id;
137 }
138
139 private ObjectId insertSingleProposal(Repository git, ProposedTimestamp ts,
140 ObjectInserter inserter) throws IOException, NoOp {
141
142 ObjectId treeId = queuedTree.writeTree(inserter);
143 queuedTree = null;
144 leader.roundHoldsReferenceToRefTree = false;
145
146 if (!ObjectId.zeroId().equals(acceptedOldIndex)) {
147 try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
148 RevCommit c = rw.parseCommit(acceptedOldIndex);
149 if (treeId.equals(c.getTree())) {
150 throw new NoOp();
151 }
152 }
153 }
154
155 Proposal p = todo.get(0);
156 CommitBuilder b = new CommitBuilder();
157 b.setTreeId(treeId);
158 if (!ObjectId.zeroId().equals(acceptedOldIndex)) {
159 b.setParentId(acceptedOldIndex);
160 }
161 b.setCommitter(leader.getSystem().newCommitter(ts));
162 b.setAuthor(p.getAuthor() != null ? p.getAuthor() : b.getCommitter());
163 b.setMessage(message(p));
164 return inserter.insert(b);
165 }
166
167 private ObjectId insertMultiProposal(Repository git, ProposedTimestamp ts,
168 ObjectInserter inserter) throws IOException, NoOp {
169
170
171
172 ObjectId lastIndex = acceptedOldIndex;
173 ObjectId oldTreeId;
174 RefTree tree;
175 if (ObjectId.zeroId().equals(lastIndex)) {
176 oldTreeId = ObjectId.zeroId();
177 tree = RefTree.newEmptyTree();
178 } else {
179 try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
180 RevCommit c = rw.parseCommit(lastIndex);
181 oldTreeId = c.getTree();
182 tree = RefTree.read(rw.getObjectReader(), c.getTree());
183 }
184 }
185
186 PersonIdent committer = leader.getSystem().newCommitter(ts);
187 for (Proposal p : todo) {
188 if (!tree.apply(p.getCommands())) {
189
190
191
192 throw new IOException(
193 KetchText.get().queuedProposalFailedToApply);
194 }
195
196 ObjectId treeId = tree.writeTree(inserter);
197 if (treeId.equals(oldTreeId)) {
198 continue;
199 }
200
201 CommitBuilder b = new CommitBuilder();
202 b.setTreeId(treeId);
203 if (!ObjectId.zeroId().equals(lastIndex)) {
204 b.setParentId(lastIndex);
205 }
206 b.setAuthor(p.getAuthor() != null ? p.getAuthor() : committer);
207 b.setCommitter(committer);
208 b.setMessage(message(p));
209 lastIndex = inserter.insert(b);
210 }
211 if (lastIndex.equals(acceptedOldIndex)) {
212 throw new NoOp();
213 }
214 return lastIndex;
215 }
216
217 private String message(Proposal p) {
218 StringBuilder m = new StringBuilder();
219 String msg = p.getMessage();
220 if (msg != null && !msg.isEmpty()) {
221 m.append(msg);
222 while (m.length() < 2 || m.charAt(m.length() - 2) != '\n'
223 || m.charAt(m.length() - 1) != '\n') {
224 m.append('\n');
225 }
226 }
227 m.append(KetchConstants.TERM.getName())
228 .append(": ")
229 .append(leader.getTerm());
230 return m.toString();
231 }
232
233 void abort() {
234 for (Proposal p : todo) {
235 p.abort();
236 }
237 }
238
239 @Override
240 void success() {
241 for (Proposal p : todo) {
242 p.success();
243 }
244 }
245
246 private List<ReceiveCommand> makeStageList(Repository git,
247 ObjectInserter inserter) throws IOException {
248
249
250
251 Map<String, ObjectId> byRef = new HashMap<>();
252 for (Proposal p : todo) {
253 for (Command c : p.getCommands()) {
254 Ref n = c.getNewRef();
255 if (n != null && !n.isSymbolic()) {
256 byRef.put(n.getName(), n.getObjectId());
257 }
258 }
259 }
260 if (byRef.isEmpty()) {
261 return Collections.emptyList();
262 }
263
264 Set<ObjectId> newObjs = new HashSet<>(byRef.values());
265 StageBuilder b = new StageBuilder(
266 leader.getSystem().getTxnStage(),
267 acceptedNewIndex);
268 return b.makeStageList(newObjs, git, inserter);
269 }
270
271 private void blockUntil(ProposedTimestamp ts)
272 throws TimeIsUncertainException {
273 List<ProposedTimestamp> times = todo.stream()
274 .flatMap(p -> p.getProposedTimestamps().stream())
275 .collect(Collectors.toCollection(ArrayList::new));
276 times.add(ts);
277
278 try {
279 Duration maxWait = getSystem().getMaxWaitForMonotonicClock();
280 ProposedTimestamp.blockUntil(times, maxWait);
281 } catch (InterruptedException | TimeoutException e) {
282 throw new TimeIsUncertainException(e);
283 }
284 }
285
286 private static class NoOp extends Exception {
287 private static final long serialVersionUID = 1L;
288 }
289 }