1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 package org.eclipse.jgit.internal.ketch;
45
46 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.ALL_REFS;
47 import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.TXN_COMMITTED;
48 import static org.eclipse.jgit.lib.RefDatabase.ALL;
49 import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
50 import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
51
52 import java.io.IOException;
53 import java.text.MessageFormat;
54 import java.util.ArrayList;
55 import java.util.Collection;
56 import java.util.List;
57 import java.util.Map;
58
59 import org.eclipse.jgit.internal.storage.reftree.RefTreeDatabase;
60 import org.eclipse.jgit.lib.BatchRefUpdate;
61 import org.eclipse.jgit.lib.NullProgressMonitor;
62 import org.eclipse.jgit.lib.Ref;
63 import org.eclipse.jgit.lib.RefDatabase;
64 import org.eclipse.jgit.lib.Repository;
65 import org.eclipse.jgit.revwalk.RevWalk;
66 import org.eclipse.jgit.transport.ReceiveCommand;
67 import org.eclipse.jgit.util.time.MonotonicClock;
68 import org.eclipse.jgit.util.time.ProposedTimestamp;
69
70
71
72
73
74 public class LocalReplica extends KetchReplica {
75
76
77
78
79
80
81
82
83
84
85 public LocalReplica(KetchLeader leader, String name, ReplicaConfig cfg) {
86 super(leader, name, cfg);
87 }
88
89
90 @Override
91 protected String describeForLog() {
92 return String.format("%s (leader)", getName());
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106 void initialize(Repository repo) throws IOException {
107 RefDatabase refdb = repo.getRefDatabase();
108 if (refdb instanceof RefTreeDatabase) {
109 RefTreeDatabase treeDb = (RefTreeDatabase) refdb;
110 String txnNamespace = getSystem().getTxnNamespace();
111 if (!txnNamespace.equals(treeDb.getTxnNamespace())) {
112 throw new IOException(MessageFormat.format(
113 KetchText.get().mismatchedTxnNamespace,
114 txnNamespace, treeDb.getTxnNamespace()));
115 }
116 refdb = treeDb.getBootstrap();
117 }
118 initialize(refdb.exactRef(
119 getSystem().getTxnAccepted(),
120 getSystem().getTxnCommitted()));
121 }
122
123
124 @Override
125 protected void startPush(ReplicaPushRequest req) {
126 getSystem().getExecutor().execute(() -> {
127 MonotonicClock clk = getSystem().getClock();
128 try (Repository git = getLeader().openRepository();
129 ProposedTimestamp ts = clk.propose()) {
130 try {
131 update(git, req, ts);
132 req.done(git);
133 } catch (Throwable err) {
134 req.setException(git, err);
135 }
136 } catch (IOException err) {
137 req.setException(null, err);
138 }
139 });
140 }
141
142
143 @Override
144 protected void blockingFetch(Repository repo, ReplicaFetchRequest req)
145 throws IOException {
146 throw new IOException(KetchText.get().cannotFetchFromLocalReplica);
147 }
148
149 private void update(Repository git, ReplicaPushRequest req,
150 ProposedTimestamp ts) throws IOException {
151 RefDatabase refdb = git.getRefDatabase();
152 CommitMethod method = getCommitMethod();
153
154
155
156 if (refdb instanceof RefTreeDatabase) {
157 if (!isOnlyTxnNamespace(req.getCommands())) {
158 return;
159 }
160
161 refdb = ((RefTreeDatabase) refdb).getBootstrap();
162 method = TXN_COMMITTED;
163 }
164
165 BatchRefUpdate batch = refdb.newBatchUpdate();
166 batch.addProposedTimestamp(ts);
167 batch.setRefLogIdent(getSystem().newCommitter(ts));
168 batch.setRefLogMessage("ketch", false);
169 batch.setAllowNonFastForwards(true);
170
171
172
173
174
175 ReceiveCommand accepted = null;
176 ReceiveCommand committed = null;
177 for (ReceiveCommand cmd : req.getCommands()) {
178 String name = cmd.getRefName();
179 if (name.equals(getSystem().getTxnAccepted())) {
180 accepted = cmd;
181 } else if (name.equals(getSystem().getTxnCommitted())) {
182 committed = cmd;
183 } else {
184 batch.addCommand(cmd);
185 }
186 }
187 if (committed != null && method == ALL_REFS) {
188 Map<String, Ref> refs = refdb.getRefs(ALL);
189 batch.addCommand(prepareCommit(git, refs, committed.getNewId()));
190 }
191 if (accepted != null) {
192 batch.addCommand(accepted);
193 }
194 if (committed != null) {
195 batch.addCommand(committed);
196 }
197
198 try (RevWalklk/RevWalk.html#RevWalk">RevWalk rw = new RevWalk(git)) {
199 batch.execute(rw, NullProgressMonitor.INSTANCE);
200 }
201
202
203
204
205 List<String> failed = new ArrayList<>(2);
206 checkFailed(failed, accepted);
207 checkFailed(failed, committed);
208 if (!failed.isEmpty()) {
209 String[] arr = failed.toArray(new String[0]);
210 req.setRefs(refdb.exactRef(arr));
211 }
212 }
213
214 private static void checkFailed(List<String> failed, ReceiveCommand cmd) {
215 if (cmd != null && cmd.getResult() != OK) {
216 failed.add(cmd.getRefName());
217 }
218 }
219
220 private boolean isOnlyTxnNamespace(Collection<ReceiveCommand> cmdList) {
221
222
223
224 String txnNamespace = getSystem().getTxnNamespace();
225 for (ReceiveCommand cmd : cmdList) {
226 if (!cmd.getRefName().startsWith(txnNamespace)) {
227 cmd.setResult(REJECTED_OTHER_REASON,
228 MessageFormat.format(
229 KetchText.get().outsideTxnNamespace,
230 cmd.getRefName(), txnNamespace));
231 ReceiveCommand.abort(cmdList);
232 return false;
233 }
234 }
235 return true;
236 }
237 }