1
2
3
4
5
6
7
8
9
10
11 package org.eclipse.jgit.internal.ketch;
12
13 import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
14 import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
15 import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
16 import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
17 import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
18 import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
19 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
20 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;
21
22 import java.net.URISyntaxException;
23 import java.time.Duration;
24 import java.util.ArrayList;
25 import java.util.List;
26 import java.util.Random;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import org.eclipse.jgit.annotations.Nullable;
33 import org.eclipse.jgit.lib.Config;
34 import org.eclipse.jgit.lib.PersonIdent;
35 import org.eclipse.jgit.lib.Repository;
36 import org.eclipse.jgit.transport.RemoteConfig;
37 import org.eclipse.jgit.transport.URIish;
38 import org.eclipse.jgit.util.time.MonotonicClock;
39 import org.eclipse.jgit.util.time.MonotonicSystemClock;
40 import org.eclipse.jgit.util.time.ProposedTimestamp;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44
45
46
47
48
49
50
51
52
53
54 public class KetchSystem {
55 private static final Random RNG = new Random();
56
57
58
59
60
61
62 public static ScheduledExecutorService defaultExecutor() {
63 return DefaultExecutorHolder.I;
64 }
65
66 private final ScheduledExecutorService executor;
67 private final MonotonicClock clock;
68 private final String txnNamespace;
69 private final String txnAccepted;
70 private final String txnCommitted;
71 private final String txnStage;
72
73
74
75
76 public KetchSystem() {
77 this(defaultExecutor(), new MonotonicSystemClock(), DEFAULT_TXN_NAMESPACE);
78 }
79
80
81
82
83
84
85
86
87
88
89
90
91
92 public KetchSystem(ScheduledExecutorService executor, MonotonicClock clock,
93 String txnNamespace) {
94 this.executor = executor;
95 this.clock = clock;
96 this.txnNamespace = txnNamespace;
97 this.txnAccepted = txnNamespace + ACCEPTED;
98 this.txnCommitted = txnNamespace + COMMITTED;
99 this.txnStage = txnNamespace + STAGE;
100 }
101
102
103
104
105
106
107 public ScheduledExecutorService getExecutor() {
108 return executor;
109 }
110
111
112
113
114
115
116 public MonotonicClock getClock() {
117 return clock;
118 }
119
120
121
122
123
124
125
126
127
128
129 public Duration getMaxWaitForMonotonicClock() {
130 return Duration.ofSeconds(5);
131 }
132
133
134
135
136
137
138
139
140
141 public boolean requireMonotonicLeaderElections() {
142 return false;
143 }
144
145
146
147
148
149
150 public String getTxnNamespace() {
151 return txnNamespace;
152 }
153
154
155
156
157
158
159 public String getTxnAccepted() {
160 return txnAccepted;
161 }
162
163
164
165
166
167
168 public String getTxnCommitted() {
169 return txnCommitted;
170 }
171
172
173
174
175
176
177 public String getTxnStage() {
178 return txnStage;
179 }
180
181
182
183
184
185
186
187
188 public PersonIdent newCommitter(ProposedTimestamp time) {
189 String name = "ketch";
190 String email = "ketch@system";
191 return new PersonIdent(name, email, time);
192 }
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215 @Nullable
216 public String newLeaderTag() {
217 int n = RNG.nextInt(1 << (6 * 4));
218 return String.format("%06x", Integer.valueOf(n));
219 }
220
221
222
223
224
225
226
227
228
229
230 public KetchLeader createLeader(Repository repo)
231 throws URISyntaxException {
232 KetchLeader leader = new KetchLeader(this) {
233 @Override
234 protected Repository openRepository() {
235 repo.incrementOpen();
236 return repo;
237 }
238 };
239 leader.setReplicas(createReplicas(leader, repo));
240 return leader;
241 }
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256 protected List<KetchReplica> createReplicas(KetchLeader leader,
257 Repository repo) throws URISyntaxException {
258 List<KetchReplica> replicas = new ArrayList<>();
259 Config cfg = repo.getConfig();
260 String localName = getLocalName(cfg);
261 for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
262 if (!hasParticipation(cfg, name)) {
263 continue;
264 }
265
266 ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
267 if (name.equals(localName)) {
268 replicas.add(new LocalReplica(leader, name, kc));
269 continue;
270 }
271
272 RemoteConfig rc = new RemoteConfig(cfg, name);
273 List<URIish> uris = rc.getPushURIs();
274 if (uris.isEmpty()) {
275 uris = rc.getURIs();
276 }
277 for (URIish uri : uris) {
278 String n = uris.size() == 1 ? name : uri.getHost();
279 replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
280 }
281 }
282 return replicas;
283 }
284
285 private static boolean hasParticipation(Config cfg, String name) {
286 return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
287 }
288
289 private static String getLocalName(Config cfg) {
290 return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
291 }
292
293 static class DefaultExecutorHolder {
294 private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
295 static final ScheduledExecutorService I = create();
296
297 private static ScheduledExecutorService create() {
298 int cores = Runtime.getRuntime().availableProcessors();
299 int threads = Math.max(5, cores);
300 log.info("Using {} threads", Integer.valueOf(threads));
301 return Executors.newScheduledThreadPool(
302 threads,
303 new ThreadFactory() {
304 private final AtomicInteger threadCnt = new AtomicInteger();
305
306 @Override
307 public Thread newThread(Runnable r) {
308 int id = threadCnt.incrementAndGet();
309 Thread thr = new Thread(r);
310 thr.setName("KetchExecutor-" + id);
311 return thr;
312 }
313 });
314 }
315
316 private DefaultExecutorHolder() {
317 }
318 }
319
320 }