View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc.
3    * and other copyright owners as documented in the project's IP log.
4    *
5    * This program and the accompanying materials are made available
6    * under the terms of the Eclipse Distribution License v1.0 which
7    * accompanies this distribution, is reproduced below, and is
8    * available at http://www.eclipse.org/org/documents/edl-v10.php
9    *
10   * All rights reserved.
11   *
12   * Redistribution and use in source and binary forms, with or
13   * without modification, are permitted provided that the following
14   * conditions are met:
15   *
16   * - Redistributions of source code must retain the above copyright
17   *   notice, this list of conditions and the following disclaimer.
18   *
19   * - Redistributions in binary form must reproduce the above
20   *   copyright notice, this list of conditions and the following
21   *   disclaimer in the documentation and/or other materials provided
22   *   with the distribution.
23   *
24   * - Neither the name of the Eclipse Foundation, Inc. nor the
25   *   names of its contributors may be used to endorse or promote
26   *   products derived from this software without specific prior
27   *   written permission.
28   *
29   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
30   * CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
31   * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
32   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
33   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
34   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
35   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
36   * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
37   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
38   * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
39   * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
40   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
41   * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42   */
43  
44  package org.eclipse.jgit.internal.ketch;
45  
46  import static org.eclipse.jgit.internal.ketch.KetchConstants.ACCEPTED;
47  import static org.eclipse.jgit.internal.ketch.KetchConstants.COMMITTED;
48  import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_KEY_TYPE;
49  import static org.eclipse.jgit.internal.ketch.KetchConstants.CONFIG_SECTION_KETCH;
50  import static org.eclipse.jgit.internal.ketch.KetchConstants.DEFAULT_TXN_NAMESPACE;
51  import static org.eclipse.jgit.internal.ketch.KetchConstants.STAGE;
52  import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_NAME;
53  import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_KEY_REMOTE;
54  
55  import java.net.URISyntaxException;
56  import java.time.Duration;
57  import java.util.ArrayList;
58  import java.util.List;
59  import java.util.Random;
60  import java.util.concurrent.Executors;
61  import java.util.concurrent.ScheduledExecutorService;
62  import java.util.concurrent.ThreadFactory;
63  import java.util.concurrent.atomic.AtomicInteger;
64  
65  import org.eclipse.jgit.annotations.Nullable;
66  import org.eclipse.jgit.lib.Config;
67  import org.eclipse.jgit.lib.PersonIdent;
68  import org.eclipse.jgit.lib.Repository;
69  import org.eclipse.jgit.transport.RemoteConfig;
70  import org.eclipse.jgit.transport.URIish;
71  import org.eclipse.jgit.util.time.MonotonicClock;
72  import org.eclipse.jgit.util.time.MonotonicSystemClock;
73  import org.eclipse.jgit.util.time.ProposedTimestamp;
74  import org.slf4j.Logger;
75  import org.slf4j.LoggerFactory;
76  
77  /**
78   * Ketch system-wide configuration.
79   * <p>
80   * This class provides useful defaults for testing and small proof of concepts.
81   * Full scale installations are expected to subclass and override methods to
82   * provide consistent configuration across all managed repositories.
83   * <p>
84   * Servers should configure their own
85   * {@link java.util.concurrent.ScheduledExecutorService}.
86   */
87  public class KetchSystem {
88  	private static final Random RNG = new Random();
89  
90  	/**
91  	 * Get default executor, one thread per available processor.
92  	 *
93  	 * @return default executor, one thread per available processor.
94  	 */
95  	public static ScheduledExecutorService defaultExecutor() {
96  		return DefaultExecutorHolder.I;
97  	}
98  
99  	private final ScheduledExecutorService executor;
100 	private final MonotonicClock clock;
101 	private final String txnNamespace;
102 	private final String txnAccepted;
103 	private final String txnCommitted;
104 	private final String txnStage;
105 
106 	/**
107 	 * Create a default system with a thread pool of 1 thread per CPU.
108 	 */
109 	public KetchSystem() {
110 		this(defaultExecutor(), new MonotonicSystemClock(), DEFAULT_TXN_NAMESPACE);
111 	}
112 
113 	/**
114 	 * Create a Ketch system with the provided executor service.
115 	 *
116 	 * @param executor
117 	 *            thread pool to run background operations.
118 	 * @param clock
119 	 *            clock to create timestamps.
120 	 * @param txnNamespace
121 	 *            reference namespace for the RefTree graph and associated
122 	 *            transaction state. Must begin with {@code "refs/"} and end
123 	 *            with {@code '/'}, for example {@code "refs/txn/"}.
124 	 */
125 	public KetchSystem(ScheduledExecutorService executor, MonotonicClock clock,
126 			String txnNamespace) {
127 		this.executor = executor;
128 		this.clock = clock;
129 		this.txnNamespace = txnNamespace;
130 		this.txnAccepted = txnNamespace + ACCEPTED;
131 		this.txnCommitted = txnNamespace + COMMITTED;
132 		this.txnStage = txnNamespace + STAGE;
133 	}
134 
135 	/**
136 	 * Get executor to perform background operations.
137 	 *
138 	 * @return executor to perform background operations.
139 	 */
140 	public ScheduledExecutorService getExecutor() {
141 		return executor;
142 	}
143 
144 	/**
145 	 * Get clock to obtain timestamps from.
146 	 *
147 	 * @return clock to obtain timestamps from.
148 	 */
149 	public MonotonicClock getClock() {
150 		return clock;
151 	}
152 
153 	/**
154 	 * Get how long the leader will wait for the {@link #getClock()}'s
155 	 * {@code ProposedTimestamp} used in commits proposed to the RefTree graph
156 	 * ({@link #getTxnAccepted()})
157 	 *
158 	 * @return how long the leader will wait for the {@link #getClock()}'s
159 	 *         {@code ProposedTimestamp} used in commits proposed to the RefTree
160 	 *         graph ({@link #getTxnAccepted()}). Defaults to 5 seconds.
161 	 */
162 	public Duration getMaxWaitForMonotonicClock() {
163 		return Duration.ofSeconds(5);
164 	}
165 
166 	/**
167 	 * Whether elections should require monotonically increasing commit
168 	 * timestamps
169 	 *
170 	 * @return {@code true} if elections should require monotonically increasing
171 	 *         commit timestamps. This requires a very good
172 	 *         {@link org.eclipse.jgit.util.time.MonotonicClock}.
173 	 */
174 	public boolean requireMonotonicLeaderElections() {
175 		return false;
176 	}
177 
178 	/**
179 	 * Get the namespace used for the RefTree graph and transaction management.
180 	 *
181 	 * @return reference namespace such as {@code "refs/txn/"}.
182 	 */
183 	public String getTxnNamespace() {
184 		return txnNamespace;
185 	}
186 
187 	/**
188 	 * Get name of the accepted RefTree graph.
189 	 *
190 	 * @return name of the accepted RefTree graph.
191 	 */
192 	public String getTxnAccepted() {
193 		return txnAccepted;
194 	}
195 
196 	/**
197 	 * Get name of the committed RefTree graph.
198 	 *
199 	 * @return name of the committed RefTree graph.
200 	 */
201 	public String getTxnCommitted() {
202 		return txnCommitted;
203 	}
204 
205 	/**
206 	 * Get prefix for staged objects, e.g. {@code "refs/txn/stage/"}.
207 	 *
208 	 * @return prefix for staged objects, e.g. {@code "refs/txn/stage/"}.
209 	 */
210 	public String getTxnStage() {
211 		return txnStage;
212 	}
213 
214 	/**
215 	 * Create new committer {@code PersonIdent} for ketch system
216 	 *
217 	 * @param time
218 	 *            timestamp for the committer.
219 	 * @return identity line for the committer header of a RefTreeGraph.
220 	 */
221 	public PersonIdent newCommitter(ProposedTimestamp time) {
222 		String name = "ketch"; //$NON-NLS-1$
223 		String email = "ketch@system"; //$NON-NLS-1$
224 		return new PersonIdent(name, email, time);
225 	}
226 
227 	/**
228 	 * Construct a random tag to identify a candidate during leader election.
229 	 * <p>
230 	 * Multiple processes trying to elect themselves leaders at exactly the same
231 	 * time (rounded to seconds) using the same
232 	 * {@link #newCommitter(ProposedTimestamp)} identity strings, for the same
233 	 * term, may generate the same ObjectId for the election commit and falsely
234 	 * assume they have both won.
235 	 * <p>
236 	 * Candidates add this tag to their election ballot commit to disambiguate
237 	 * the election. The tag only needs to be unique for a given triplet of
238 	 * {@link #newCommitter(ProposedTimestamp)}, system time (rounded to
239 	 * seconds), and term. If every replica in the system uses a unique
240 	 * {@code newCommitter} (such as including the host name after the
241 	 * {@code "@"} in the email address) the tag could be the empty string.
242 	 * <p>
243 	 * The default implementation generates a few bytes of random data.
244 	 *
245 	 * @return unique tag; null or empty string if {@code newCommitter()} is
246 	 *         sufficiently unique to identify the leader.
247 	 */
248 	@Nullable
249 	public String newLeaderTag() {
250 		int n = RNG.nextInt(1 << (6 * 4));
251 		return String.format("%06x", Integer.valueOf(n)); //$NON-NLS-1$
252 	}
253 
254 	/**
255 	 * Construct the KetchLeader instance of a repository.
256 	 *
257 	 * @param repo
258 	 *            local repository stored by the leader.
259 	 * @return leader instance.
260 	 * @throws java.net.URISyntaxException
261 	 *             a follower configuration contains an unsupported URI.
262 	 */
263 	public KetchLeader createLeader(Repository repo)
264 			throws URISyntaxException {
265 		KetchLeader leader = new KetchLeader(this) {
266 			@Override
267 			protected Repository openRepository() {
268 				repo.incrementOpen();
269 				return repo;
270 			}
271 		};
272 		leader.setReplicas(createReplicas(leader, repo));
273 		return leader;
274 	}
275 
276 	/**
277 	 * Get the collection of replicas for a repository.
278 	 * <p>
279 	 * The collection of replicas must include the local repository.
280 	 *
281 	 * @param leader
282 	 *            the leader driving these replicas.
283 	 * @param repo
284 	 *            repository to get the replicas of.
285 	 * @return collection of replicas for the specified repository.
286 	 * @throws java.net.URISyntaxException
287 	 *             a configured URI is invalid.
288 	 */
289 	protected List<KetchReplica> createReplicas(KetchLeader leader,
290 			Repository repo) throws URISyntaxException {
291 		List<KetchReplica> replicas = new ArrayList<>();
292 		Config cfg = repo.getConfig();
293 		String localName = getLocalName(cfg);
294 		for (String name : cfg.getSubsections(CONFIG_KEY_REMOTE)) {
295 			if (!hasParticipation(cfg, name)) {
296 				continue;
297 			}
298 
299 			ReplicaConfig kc = ReplicaConfig.newFromConfig(cfg, name);
300 			if (name.equals(localName)) {
301 				replicas.add(new LocalReplica(leader, name, kc));
302 				continue;
303 			}
304 
305 			RemoteConfig rc = new RemoteConfig(cfg, name);
306 			List<URIish> uris = rc.getPushURIs();
307 			if (uris.isEmpty()) {
308 				uris = rc.getURIs();
309 			}
310 			for (URIish uri : uris) {
311 				String n = uris.size() == 1 ? name : uri.getHost();
312 				replicas.add(new RemoteGitReplica(leader, n, uri, kc, rc));
313 			}
314 		}
315 		return replicas;
316 	}
317 
318 	private static boolean hasParticipation(Config cfg, String name) {
319 		return cfg.getString(CONFIG_KEY_REMOTE, name, CONFIG_KEY_TYPE) != null;
320 	}
321 
322 	private static String getLocalName(Config cfg) {
323 		return cfg.getString(CONFIG_SECTION_KETCH, null, CONFIG_KEY_NAME);
324 	}
325 
326 	static class DefaultExecutorHolder {
327 		private static final Logger log = LoggerFactory.getLogger(KetchSystem.class);
328 		static final ScheduledExecutorService I = create();
329 
330 		private static ScheduledExecutorService create() {
331 			int cores = Runtime.getRuntime().availableProcessors();
332 			int threads = Math.max(5, cores);
333 			log.info("Using {} threads", Integer.valueOf(threads)); //$NON-NLS-1$
334 			return Executors.newScheduledThreadPool(
335 				threads,
336 				new ThreadFactory() {
337 					private final AtomicInteger threadCnt = new AtomicInteger();
338 
339 					@Override
340 					public Thread newThread(Runnable r) {
341 						int id = threadCnt.incrementAndGet();
342 						Thread thr = new Thread(r);
343 						thr.setName("KetchExecutor-" + id); //$NON-NLS-1$
344 						return thr;
345 					}
346 				});
347 		}
348 
349 		private DefaultExecutorHolder() {
350 		}
351 	}
352 
353 	/**
354 	 * Compute a delay in a {@code min..max} interval with random jitter.
355 	 *
356 	 * @param last
357 	 *            amount of delay waited before the last attempt. This is used
358 	 *            to seed the next delay interval. Should be 0 if there was no
359 	 *            prior delay.
360 	 * @param min
361 	 *            shortest amount of allowable delay between attempts.
362 	 * @param max
363 	 *            longest amount of allowable delay between attempts.
364 	 * @return new amount of delay to wait before the next attempt.
365 	 */
366 	static long delay(long last, long min, long max) {
367 		long r = Math.max(0, last * 3 - min);
368 		if (r > 0) {
369 			int c = (int) Math.min(r + 1, Integer.MAX_VALUE);
370 			r = RNG.nextInt(c);
371 		}
372 		return Math.max(Math.min(min + r, max), min);
373 	}
374 }