View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.ketch;
12  
13  import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
14  import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
15  import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
16  import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
17  import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
18  import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
19  import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
20  import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;
21  
22  import java.net.URISyntaxException;
23  import java.time.Duration;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.Random;
27  import java.util.concurrent.Executors;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.concurrent.ThreadFactory;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.eclipse.jgit.annotations.Nullable;
33  import org.eclipse.jgit.lib.Config;
34  import org.eclipse.jgit.lib.PersonIdent;
35  import org.eclipse.jgit.lib.Repository;
36  import org.eclipse.jgit.transport.RemoteConfig;
37  import org.eclipse.jgit.transport.URIish;
38  import org.eclipse.jgit.util.time.MonotonicClock;
39  import org.eclipse.jgit.util.time.MonotonicSystemClock;
40  import org.eclipse.jgit.util.time.ProposedTimestamp;
41  import org.slf4j.Logger;
42  import org.slf4j.LoggerFactory;
43  
44  /**
45   * Ketch system-wide configuration.
46   * <p>
47   * This class provides useful defaults for testing and small proof of concepts.
48   * Full scale installations are expected to subclass and override methods to
49   * provide consistent configuration across all managed repositories.
50   * <p>
51   * Servers should configure their own
52   * {@link java.util.concurrent.ScheduledExecutorService}.
53   */
54  public class KetchSystem {
55  	private static final Random RNG = new Random();
56  
57  	/**
58  	 * Get default executor, one thread per available processor.
59  	 *
60  	 * @return default executor, one thread per available processor.
61  	 */
62  	public static ScheduledExecutorService defaultExecutor() {
63  		return DefaultExecutorHolder.I;
64  	}
65  
66  	private final ScheduledExecutorService executor;
67  	private final MonotonicClock clock;
68  	private final String txnNamespace;
69  	private final String txnAccepted;
70  	private final String txnCommitted;
71  	private final String txnStage;
72  
73  	/**
74  	 * Create a default system with a thread pool of 1 thread per CPU.
75  	 */
76  	public KetchSystem() {
77  		this(defaultExecutor(), new MonotonicSystemClock(), DEFAULT_TXN_NAMESPACE);
78  	}
79  
80  	/**
81  	 * Create a Ketch system with the provided executor service.
82  	 *
83  	 * @param executor
84  	 *            thread pool to run background operations.
85  	 * @param clock
86  	 *            clock to create timestamps.
87  	 * @param txnNamespace
88  	 *            reference namespace for the RefTree graph and associated
89  	 *            transaction state. Must begin with {@code "refs/"} and end
90  	 *            with {@code '/'}, for example {@code "refs/txn/"}.
91  	 */
92  	public KetchSystem(ScheduledExecutorService executor, MonotonicClock clock,
93  			String txnNamespace) {
94  		this.executor = executor;
95  		this.clock = clock;
96  		this.txnNamespace = txnNamespace;
97  		this.txnAccepted = txnNamespace + ACCEPTED;
98  		this.txnCommitted = txnNamespace + COMMITTED;
99  		this.txnStage = txnNamespace + STAGE;
100 	}
101 
102 	/**
103 	 * Get executor to perform background operations.
104 	 *
105 	 * @return executor to perform background operations.
106 	 */
107 	public ScheduledExecutorService getExecutor() {
108 		return executor;
109 	}
110 
111 	/**
112 	 * Get clock to obtain timestamps from.
113 	 *
114 	 * @return clock to obtain timestamps from.
115 	 */
116 	public MonotonicClock getClock() {
117 		return clock;
118 	}
119 
120 	/**
121 	 * Get how long the leader will wait for the {@link #getClock()}'s
122 	 * {@code ProposedTimestamp} used in commits proposed to the RefTree graph
123 	 * ({@link #getTxnAccepted()})
124 	 *
125 	 * @return how long the leader will wait for the {@link #getClock()}'s
126 	 *         {@code ProposedTimestamp} used in commits proposed to the RefTree
127 	 *         graph ({@link #getTxnAccepted()}). Defaults to 5 seconds.
128 	 */
129 	public Duration getMaxWaitForMonotonicClock() {
130 		return Duration.ofSeconds(5);
131 	}
132 
133 	/**
134 	 * Whether elections should require monotonically increasing commit
135 	 * timestamps
136 	 *
137 	 * @return {@code true} if elections should require monotonically increasing
138 	 *         commit timestamps. This requires a very good
139 	 *         {@link org.eclipse.jgit.util.time.MonotonicClock}.
140 	 */
141 	public boolean requireMonotonicLeaderElections() {
142 		return false;
143 	}
144 
145 	/**
146 	 * Get the namespace used for the RefTree graph and transaction management.
147 	 *
148 	 * @return reference namespace such as {@code "refs/txn/"}.
149 	 */
150 	public String getTxnNamespace() {
151 		return txnNamespace;
152 	}
153 
154 	/**
155 	 * Get name of the accepted RefTree graph.
156 	 *
157 	 * @return name of the accepted RefTree graph.
158 	 */
159 	public String getTxnAccepted() {
160 		return txnAccepted;
161 	}
162 
163 	/**
164 	 * Get name of the committed RefTree graph.
165 	 *
166 	 * @return name of the committed RefTree graph.
167 	 */
168 	public String getTxnCommitted() {
169 		return txnCommitted;
170 	}
171 
172 	/**
173 	 * Get prefix for staged objects, e.g. {@code "refs/txn/stage/"}.
174 	 *
175 	 * @return prefix for staged objects, e.g. {@code "refs/txn/stage/"}.
176 	 */
177 	public String getTxnStage() {
178 		return txnStage;
179 	}
180 
181 	/**
182 	 * Create new committer {@code PersonIdent} for ketch system
183 	 *
184 	 * @param time
185 	 *            timestamp for the committer.
186 	 * @return identity line for the committer header of a RefTreeGraph.
187 	 */
188 	public PersonIdent newCommitter(ProposedTimestamp time) {
189 		String name = "ketch"; //$NON-NLS-1$
190 		String email = "ketch@system"; //$NON-NLS-1$
191 		return new PersonIdent(name, email, time);
192 	}
193 
194 	/**
195 	 * Construct a random tag to identify a candidate during leader election.
196 	 * <p>
197 	 * Multiple processes trying to elect themselves leaders at exactly the same
198 	 * time (rounded to seconds) using the same
199 	 * {@link #newCommitter(ProposedTimestamp)} identity strings, for the same
200 	 * term, may generate the same ObjectId for the election commit and falsely
201 	 * assume they have both won.
202 	 * <p>
203 	 * Candidates add this tag to their election ballot commit to disambiguate
204 	 * the election. The tag only needs to be unique for a given triplet of
205 	 * {@link #newCommitter(ProposedTimestamp)}, system time (rounded to
206 	 * seconds), and term. If every replica in the system uses a unique
207 	 * {@code newCommitter} (such as including the host name after the
208 	 * {@code "@"} in the email address) the tag could be the empty string.
209 	 * <p>
210 	 * The default implementation generates a few bytes of random data.
211 	 *
212 	 * @return unique tag; null or empty string if {@code newCommitter()} is
213 	 *         sufficiently unique to identify the leader.
214 	 */
215 	@Nullable
216 	public String newLeaderTag() {
217 		int n = RNG.nextInt(1 << (6 * 4));
218 		return String.format("%06x", Integer.valueOf(n)); //$NON-NLS-1$
219 	}
220 
221 	/**
222 	 * Construct the KetchLeader instance of a repository.
223 	 *
224 	 * @param repo
225 	 *            local repository stored by the leader.
226 	 * @return leader instance.
227 	 * @throws java.net.URISyntaxException
228 	 *             a follower configuration contains an unsupported URI.
229 	 */
230 	public KetchLeader createLeader(Repository repo)
231 			throws URISyntaxException {
232 		KetchLeader leader = new KetchLeader(this) {
233 			@Override
234 			protected Repository openRepository() {
235 				repo.incrementOpen();
236 				return repo;
237 			}
238 		};
239 		leader.setReplicas(createReplicas(leader, repo));
240 		return leader;
241 	}
242 
243 	/**
244 	 * Get the collection of replicas for a repository.
245 	 * <p>
246 	 * The collection of replicas must include the local repository.
247 	 *
248 	 * @param leader
249 	 *            the leader driving these replicas.
250 	 * @param repo
251 	 *            repository to get the replicas of.
252 	 * @return collection of replicas for the specified repository.
253 	 * @throws java.net.URISyntaxException
254 	 *             a configured URI is invalid.
255 	 */
256 	protected List<KetchReplica> createReplicas(KetchLeader leader,
257 			Repository repo) throws URISyntaxException {
258 		List<KetchReplica> replicas = new ArrayList<>();
259 		Config cfg = repo.getConfig();
260 		String localName = getLocalName(cfg);
261 		for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
262 			if (!hasParticipation(cfg, name)) {
263 				continue;
264 			}
265 
266 			ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
267 			if (name.equals(localName)) {
268 				replicas.add(new LocalReplica(leader, name, kc));
269 				continue;
270 			}
271 
272 			RemoteConfig rc = new RemoteConfig(cfg, name);
273 			List<URIish> uris = rc.getPushURIs();
274 			if (uris.isEmpty()) {
275 				uris = rc.getURIs();
276 			}
277 			for (URIish uri : uris) {
278 				String n = uris.size() == 1 ? name : uri.getHost();
279 				replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
280 			}
281 		}
282 		return replicas;
283 	}
284 
285 	private static boolean hasParticipation(Config cfg, String name) {
286 		return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
287 	}
288 
289 	private static String getLocalName(Config cfg) {
290 		return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
291 	}
292 
293 	static class DefaultExecutorHolder {
294 		private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
295 		static final ScheduledExecutorService I = create();
296 
297 		private static ScheduledExecutorService create() {
298 			int cores = Runtime.getRuntime().availableProcessors();
299 			int threads = Math.max(5, cores);
300 			log.info("Using {} threads", Integer.valueOf(threads)); //$NON-NLS-1$
301 			return Executors.newScheduledThreadPool(
302 				threads,
303 				new ThreadFactory() {
304 					private final AtomicInteger threadCnt = new AtomicInteger();
305 
306 					@Override
307 					public Thread newThread(Runnable r) {
308 						int id = threadCnt.incrementAndGet();
309 						Thread thr = new Thread(r);
310 						thr.setName("KetchExecutor-" + id); //$NON-NLS-1$
311 						return thr;
312 					}
313 				});
314 		}
315 
316 		private DefaultExecutorHolder() {
317 		}
318 	}
319 
320 }