View Javadoc
1   /*
2    * Copyright (C) 2010, Google Inc. and others
3    *
4    * This program and the accompanying materials are made available under the
5    * terms of the Eclipse Distribution License v. 1.0 which is available at
6    * https://www.eclipse.org/org/documents/edl-v10.php.
7    *
8    * SPDX-License-Identifier: BSD-3-Clause
9    */
10  
11  package org.eclipse.jgit.lib;
12  
13  import java.util.concurrent.Semaphore;
14  import java.util.concurrent.atomic.AtomicInteger;
15  import java.util.concurrent.locks.ReentrantLock;
16  
17  /**
18   * Wrapper around the general {@link org.eclipse.jgit.lib.ProgressMonitor} to
19   * make it thread safe.
20   *
21   * Updates to the underlying ProgressMonitor are made only from the thread that
22   * allocated this wrapper. Callers are responsible for ensuring the allocating
23   * thread uses {@link #pollForUpdates()} or {@link #waitForCompletion()} to
24   * update the underlying ProgressMonitor.
25   *
26   * Only {@link #update(int)}, {@link #isCancelled()}, and {@link #endWorker()}
27   * may be invoked from a worker thread. All other methods of the ProgressMonitor
28   * interface can only be called from the thread that allocates this wrapper.
29   */
30  public class ThreadSafeProgressMonitor implements ProgressMonitor {
31  	private final ProgressMonitor pm;
32  
33  	private final ReentrantLock lock;
34  
35  	private final Thread mainThread;
36  
37  	private final AtomicInteger workers;
38  
39  	private final AtomicInteger pendingUpdates;
40  
41  	private final Semaphore process;
42  
43  	/**
44  	 * Wrap a ProgressMonitor to be thread safe.
45  	 *
46  	 * @param pm
47  	 *            the underlying monitor to receive events.
48  	 */
49  	public ThreadSafeProgressMonitor(ProgressMonitor pm) {
50  		this.pm = pm;
51  		this.lock = new ReentrantLock();
52  		this.mainThread = Thread.currentThread();
53  		this.workers = new AtomicInteger(0);
54  		this.pendingUpdates = new AtomicInteger(0);
55  		this.process = new Semaphore(0);
56  	}
57  
58  	/** {@inheritDoc} */
59  	@Override
60  	public void start(int totalTasks) {
61  		if (!isMainThread())
62  			throw new IllegalStateException();
63  		pm.start(totalTasks);
64  	}
65  
66  	/** {@inheritDoc} */
67  	@Override
68  	public void beginTask(String title, int totalWork) {
69  		if (!isMainThread())
70  			throw new IllegalStateException();
71  		pm.beginTask(title, totalWork);
72  	}
73  
74  	/**
75  	 * Notify the monitor a worker is starting.
76  	 */
77  	public void startWorker() {
78  		startWorkers(1);
79  	}
80  
81  	/**
82  	 * Notify the monitor of workers starting.
83  	 *
84  	 * @param count
85  	 *            the number of worker threads that are starting.
86  	 */
87  	public void startWorkers(int count) {
88  		workers.addAndGet(count);
89  	}
90  
91  	/**
92  	 * Notify the monitor a worker is finished.
93  	 */
94  	public void endWorker() {
95  		if (workers.decrementAndGet() == 0)
96  			process.release();
97  	}
98  
99  	/**
100 	 * Non-blocking poll for pending updates.
101 	 *
102 	 * This method can only be invoked by the same thread that allocated this
103 	 * ThreadSafeProgressMonior.
104 	 */
105 	public void pollForUpdates() {
106 		assert isMainThread();
107 		doUpdates();
108 	}
109 
110 	/**
111 	 * Process pending updates and wait for workers to finish.
112 	 *
113 	 * This method can only be invoked by the same thread that allocated this
114 	 * ThreadSafeProgressMonior.
115 	 *
116 	 * @throws java.lang.InterruptedException
117 	 *             if the main thread is interrupted while waiting for
118 	 *             completion of workers.
119 	 */
120 	public void waitForCompletion() throws InterruptedException {
121 		assert isMainThread();
122 		while (0 < workers.get()) {
123 			doUpdates();
124 			process.acquire();
125 		}
126 		doUpdates();
127 	}
128 
129 	private void doUpdates() {
130 		int cnt = pendingUpdates.getAndSet(0);
131 		if (0 < cnt)
132 			pm.update(cnt);
133 	}
134 
135 	/** {@inheritDoc} */
136 	@Override
137 	public void update(int completed) {
138 		if (0 == pendingUpdates.getAndAdd(completed))
139 			process.release();
140 	}
141 
142 	/** {@inheritDoc} */
143 	@Override
144 	public boolean isCancelled() {
145 		lock.lock();
146 		try {
147 			return pm.isCancelled();
148 		} finally {
149 			lock.unlock();
150 		}
151 	}
152 
153 	/** {@inheritDoc} */
154 	@Override
155 	public void endTask() {
156 		if (!isMainThread())
157 			throw new IllegalStateException();
158 		pm.endTask();
159 	}
160 
161 	private boolean isMainThread() {
162 		return Thread.currentThread() == mainThread;
163 	}
164 }