View Javadoc
1   /*
2    * Copyright (C) 2016, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.internal.ketch;
12  
13  import static org.eclipse.jgit.internal.ketch.KetchReplica.CommitMethod.ALL_REFS;
14  import static org.eclipse.jgit.lib.Ref.Storage.NETWORK;
15  import static org.eclipse.jgit.transport.ReceiveCommand.Result.LOCK_FAILURE;
16  import static org.eclipse.jgit.transport.ReceiveCommand.Result.NOT_ATTEMPTED;
17  import static org.eclipse.jgit.transport.ReceiveCommand.Result.OK;
18  import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NODELETE;
19  import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NONFASTFORWARD;
20  import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.LinkedHashMap;
27  import java.util.List;
28  import java.util.Map;
29  
30  import org.eclipse.jgit.annotations.Nullable;
31  import org.eclipse.jgit.errors.NotSupportedException;
32  import org.eclipse.jgit.errors.TransportException;
33  import org.eclipse.jgit.lib.AnyObjectId;
34  import org.eclipse.jgit.lib.NullProgressMonitor;
35  import org.eclipse.jgit.lib.ObjectId;
36  import org.eclipse.jgit.lib.ObjectIdRef;
37  import org.eclipse.jgit.lib.Ref;
38  import org.eclipse.jgit.lib.Repository;
39  import org.eclipse.jgit.transport.FetchConnection;
40  import org.eclipse.jgit.transport.PushConnection;
41  import org.eclipse.jgit.transport.ReceiveCommand;
42  import org.eclipse.jgit.transport.RemoteConfig;
43  import org.eclipse.jgit.transport.RemoteRefUpdate;
44  import org.eclipse.jgit.transport.Transport;
45  import org.eclipse.jgit.transport.URIish;
46  
47  /**
48   * Representation of a Git repository on a remote replica system.
49   * <p>
50   * {@link org.eclipse.jgit.internal.ketch.KetchLeader} will contact the replica
51   * using the Git wire protocol.
52   * <p>
53   * The remote replica may be fully Ketch-aware, or a standard Git server.
54   */
55  public class RemoteGitReplica extends KetchReplica {
56  	private final URIish uri;
57  	private final RemoteConfig remoteConfig;
58  
59  	/**
60  	 * Configure a new remote.
61  	 *
62  	 * @param leader
63  	 *            instance this replica follows.
64  	 * @param name
65  	 *            unique-ish name identifying this remote for debugging.
66  	 * @param uri
67  	 *            URI to connect to the follower's repository.
68  	 * @param cfg
69  	 *            how Ketch should treat the remote system.
70  	 * @param rc
71  	 *            optional remote configuration describing how to contact the
72  	 *            peer repository.
73  	 */
74  	public RemoteGitReplica(KetchLeader leader, String name, URIish uri,
75  			ReplicaConfig cfg, @Nullable RemoteConfig rc) {
76  		super(leader, name, cfg);
77  		this.uri = uri;
78  		this.remoteConfig = rc;
79  	}
80  
81  	/**
82  	 * Get URI to contact the remote peer repository.
83  	 *
84  	 * @return URI to contact the remote peer repository.
85  	 */
86  	public URIish getURI() {
87  		return uri;
88  	}
89  
90  	/**
91  	 * Get optional configuration describing how to contact the peer.
92  	 *
93  	 * @return optional configuration describing how to contact the peer.
94  	 */
95  	@Nullable
96  	protected RemoteConfig getRemoteConfig() {
97  		return remoteConfig;
98  	}
99  
100 	/** {@inheritDoc} */
101 	@Override
102 	protected String describeForLog() {
103 		return String.format("%s @ %s", getName(), getURI()); //$NON-NLS-1$
104 	}
105 
106 	/** {@inheritDoc} */
107 	@Override
108 	protected void startPush(ReplicaPushRequest req) {
109 		getSystem().getExecutor().execute(() -> {
110 			try (Repository git = getLeader().openRepository()) {
111 				try {
112 					push(git, req);
113 					req.done(git);
114 				} catch (Throwable err) {
115 					req.setException(git, err);
116 				}
117 			} catch (IOException err) {
118 				req.setException(null, err);
119 			}
120 		});
121 	}
122 
123 	private void push(Repository repo, ReplicaPushRequest req)
124 			throws NotSupportedException, TransportException, IOException {
125 		Map<String, Ref> adv;
126 		List<RemoteCommand> cmds = asUpdateList(req.getCommands());
127 		try (Transport transport = Transport.open(repo, uri)) {
128 			RemoteConfig rc = getRemoteConfig();
129 			if (rc != null) {
130 				transport.applyConfig(rc);
131 			}
132 			transport.setPushAtomic(true);
133 			adv = push(repo, transport, cmds);
134 		}
135 		for (RemoteCommand c : cmds) {
136 			c.copyStatusToResult();
137 		}
138 		req.setRefs(adv);
139 	}
140 
141 	private Map<String, Ref> push(Repository git, Transport transport,
142 			List<RemoteCommand> cmds) throws IOException {
143 		Map<String, RemoteRefUpdate> updates = asUpdateMap(cmds);
144 		try (PushConnection connection = transport.openPush()) {
145 			Map<String, Ref> adv = connection.getRefsMap();
146 			RemoteRefUpdate accepted = updates.get(getSystem().getTxnAccepted());
147 			if (accepted != null && !isExpectedValue(adv, accepted)) {
148 				abort(cmds);
149 				return adv;
150 			}
151 
152 			RemoteRefUpdate committed = updates.get(getSystem().getTxnCommitted());
153 			if (committed != null && !isExpectedValue(adv, committed)) {
154 				abort(cmds);
155 				return adv;
156 			}
157 			if (committed != null && getCommitMethod() == ALL_REFS) {
158 				prepareCommit(git, cmds, updates, adv,
159 						committed.getNewObjectId());
160 			}
161 
162 			connection.push(NullProgressMonitor.INSTANCE, updates);
163 			return adv;
164 		}
165 	}
166 
167 	private static boolean isExpectedValue(Map<String, Ref> adv,
168 			RemoteRefUpdate u) {
169 		Ref r = adv.get(u.getRemoteName());
170 		if (!AnyObjectId.isEqual(getId(r), u.getExpectedOldObjectId())) {
171 			((RemoteCommand) u).cmd.setResult(LOCK_FAILURE);
172 			return false;
173 		}
174 		return true;
175 	}
176 
177 	private void prepareCommit(Repository git, List<RemoteCommand> cmds,
178 			Map<String, RemoteRefUpdate> updates, Map<String, Ref> adv,
179 			ObjectId committed) throws IOException {
180 		for (ReceiveCommand cmd : prepareCommit(git, adv, committed)) {
181 			RemoteCommand c = new RemoteCommand(cmd);
182 			cmds.add(c);
183 			updates.put(c.getRemoteName(), c);
184 		}
185 	}
186 
187 	private static List<RemoteCommand> asUpdateList(
188 			Collection<ReceiveCommand> cmds) {
189 		try {
190 			List<RemoteCommand> toPush = new ArrayList<>(cmds.size());
191 			for (ReceiveCommand cmd : cmds) {
192 				toPush.add(new RemoteCommand(cmd));
193 			}
194 			return toPush;
195 		} catch (IOException e) {
196 			// Cannot occur as no IO was required to build the command.
197 			throw new IllegalStateException(e);
198 		}
199 	}
200 
201 	private static Map<String, RemoteRefUpdate> asUpdateMap(
202 			List<RemoteCommand> cmds) {
203 		Map<String, RemoteRefUpdate> m = new LinkedHashMap<>();
204 		for (RemoteCommand cmd : cmds) {
205 			m.put(cmd.getRemoteName(), cmd);
206 		}
207 		return m;
208 	}
209 
210 	private static void abort(List<RemoteCommand> cmds) {
211 		List<ReceiveCommand> tmp = new ArrayList<>(cmds.size());
212 		for (RemoteCommand cmd : cmds) {
213 			tmp.add(cmd.cmd);
214 		}
215 		ReceiveCommand.abort(tmp);
216 	}
217 
218 	/** {@inheritDoc} */
219 	@Override
220 	protected void blockingFetch(Repository repo, ReplicaFetchRequest req)
221 			throws NotSupportedException, TransportException {
222 		try (Transport transport = Transport.open(repo, uri)) {
223 			RemoteConfig rc = getRemoteConfig();
224 			if (rc != null) {
225 				transport.applyConfig(rc);
226 			}
227 			fetch(transport, req);
228 		}
229 	}
230 
231 	private void fetch(Transport transport, ReplicaFetchRequest req)
232 			throws NotSupportedException, TransportException {
233 		try (FetchConnection conn = transport.openFetch()) {
234 			Map<String, Ref> remoteRefs = conn.getRefsMap();
235 			req.setRefs(remoteRefs);
236 
237 			List<Ref> want = new ArrayList<>();
238 			for (String name : req.getWantRefs()) {
239 				Ref ref = remoteRefs.get(name);
240 				if (ref != null && ref.getObjectId() != null) {
241 					want.add(ref);
242 				}
243 			}
244 			for (ObjectId id : req.getWantObjects()) {
245 				want.add(new ObjectIdRef.Unpeeled(NETWORK, id.name(), id));
246 			}
247 
248 			conn.fetch(NullProgressMonitor.INSTANCE, want,
249 					Collections.<ObjectId> emptySet());
250 		}
251 	}
252 
253 	static class RemoteCommand extends RemoteRefUpdate {
254 		final ReceiveCommand cmd;
255 
256 		RemoteCommand(ReceiveCommand cmd) throws IOException {
257 			super(null, null,
258 					cmd.getNewId(), cmd.getRefName(),
259 					true /* force update */,
260 					null /* no local tracking ref */,
261 					cmd.getOldId());
262 			this.cmd = cmd;
263 		}
264 
265 		void copyStatusToResult() {
266 			if (cmd.getResult() == NOT_ATTEMPTED) {
267 				switch (getStatus()) {
268 				case OK:
269 				case UP_TO_DATE:
270 				case NON_EXISTING:
271 					cmd.setResult(OK);
272 					break;
273 
274 				case REJECTED_NODELETE:
275 					cmd.setResult(REJECTED_NODELETE);
276 					break;
277 
278 				case REJECTED_NONFASTFORWARD:
279 					cmd.setResult(REJECTED_NONFASTFORWARD);
280 					break;
281 
282 				case REJECTED_OTHER_REASON:
283 					cmd.setResult(REJECTED_OTHER_REASON, getMessage());
284 					break;
285 
286 				default:
287 					cmd.setResult(REJECTED_OTHER_REASON, getStatus().name());
288 					break;
289 				}
290 			}
291 		}
292 	}
293 }