View Javadoc

1   /***
2    *     Ambient - A music player for the Android platform
3    Copyright (C) 2007 Martin Vysny
4    
5    This program is free software: you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation, either version 3 of the License, or
8    (at your option) any later version.
9    
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14  
15   You should have received a copy of the GNU General Public License
16   along with this program.  If not, see <http://www.gnu.org/licenses/>.
17   */
18  
19  package sk.baka.ambient;
20  
21  import java.util.Map;
22  import java.util.concurrent.ConcurrentHashMap;
23  import java.util.concurrent.ConcurrentMap;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.RejectedExecutionException;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicInteger;
30  import java.util.concurrent.locks.ReadWriteLock;
31  import java.util.concurrent.locks.ReentrantReadWriteLock;
32  
33  import sk.baka.ambient.commons.NullArgumentException;
34  import android.util.Log;
35  
36  /***
37   * <p>
38   * Able to execute multiple long operations running simultaneously in the
39   * background. Accepts {@link Runnable} task implementations. The tasks will run
40   * in low-priority daemon thread. Thread-safe.
41   * </p>
42   * <p>
43   * Tasks are differentiated based on a task type object. At most a single
44   * instance of a task of given type may be submitted - multiple submissions of
45   * tasks having the same task type will do nothing unless there is no such task
46   * in executor's queue or already being executed. The task type object must
47   * comply the contract for a {@link Map} key.
48   * </p>
49   * <p>
50   * A task must periodically check for its {@link Thread#isInterrupted()
51   * interrupt} status. If it is interrupted, it should terminate ASAP. It may do
52   * so by throwing an Exception - exceptions are not reported when the process is
53   * interrupted.
54   * </p>
55   * <p>
56   * Uncaught exceptions thrown by tasks are reported using
57   * {@link AmbientApplication#error(Class, boolean, String, Throwable)}.
58   * </p>
59   * <p>
60   * The executor initially starts in offline mode. Use
61   * {@link #setOffline(boolean)} to change the mode.
62   * </p>
63   * <p>
64   * There is no support for mutually exclusive tasks. If you need this
65   * functionality you have to implement it yourself, for example by synchronizing
66   * on the same object instance.
67   * </p>
68   * 
69   * @author Martin Vysny
70   */
71  public final class BackgroundOpExecutor implements ThreadFactory,
72  		IBackgroundTask {
73  	/***
74  	 * Describes a single task.
75  	 * 
76  	 * @author mvy
77  	 */
78  	private static class TaskInfo {
79  		/***
80  		 * The current task name.
81  		 */
82  		public volatile String name;
83  		/***
84  		 * Current progress of the task.
85  		 */
86  		public volatile int progress = 0;
87  		/***
88  		 * The maximum progress.
89  		 */
90  		public volatile int maxProgress = 100;
91  
92  		/***
93  		 * The thread the task is executed in. <code>null</code> if the task is
94  		 * not currently being run.
95  		 */
96  		public volatile Thread thread;
97  		
98  		/***
99  		 * <code>true</code> if the task was canceled using the {@link BackgroundOpExecutor#cancel(Object)} method.
100 		 */
101 		public volatile boolean canceled = false;
102 	}
103 
104 	/***
105 	 * Contains set of scheduled or executing tasks. Maps the task type to the
106 	 * task name.
107 	 */
108 	private final ConcurrentMap<Object, TaskInfo> scheduledTasks = new ConcurrentHashMap<Object, TaskInfo>();
109 
110 	/***
111 	 * <p>
112 	 * Schedules given task for execution. Does nothing if the task is rejected
113 	 * to be submitted. Task may be rejected for these reasons:
114 	 * </p>
115 	 * <ul>
116 	 * <li>it is already scheduled for execution or executed</li>
117 	 * <li>Processing of online task was requested while in offline mode</li>
118 	 * </ul>
119 	 * 
120 	 * @param task
121 	 *            the task to schedule, must not be <code>null</code>.
122 	 * @param taskType
123 	 *            the task type, must not be <code>null</code>.
124 	 * @param online
125 	 *            if <code>true</code> then this task processes some online
126 	 *            content.
127 	 * @param name
128 	 *            the displayable task name. A grammar construct of <code>name +
129 	 *            "failed"</code> should be a meaningful text.
130 	 * @return <code>true</code> if the task was scheduled, <code>false</code>
131 	 *         if it was rejected.
132 	 */
133 	public boolean schedule(final Runnable task, final Object taskType,
134 			final boolean online, final String name) {
135 		if (name == null) {
136 			throw new NullArgumentException("name");
137 		}
138 		if (taskType == null) {
139 			throw new NullArgumentException("taskType");
140 		}
141 		final TaskInfo info = new TaskInfo();
142 		info.name = name;
143 		if (scheduledTasks.putIfAbsent(taskType, info) != null) {
144 			return false;
145 		}
146 		// okay, we have a new task. Schedule it.
147 		boolean scheduled = false;
148 		executorLock.readLock().lock();
149 		try {
150 			final ExecutorService executor;
151 			if (online) {
152 				executor = onlineExecutor;
153 				if (executor == null) {
154 					return false;
155 				}
156 			} else {
157 				executor = offlineExecutor;
158 			}
159 			try {
160 				executor.execute(new ProtectedRunnable(task, info, taskType));
161 			} catch (RejectedExecutionException ex) {
162 				// no luck. we are probably terminating, or someone switched to
163 				// offline mode.
164 				return false;
165 			}
166 			scheduled = true;
167 			return true;
168 		} finally {
169 			if (!scheduled) {
170 				scheduledTasks.remove(taskType);
171 			}
172 			executorLock.readLock().unlock();
173 		}
174 	}
175 
176 	private final AtomicBoolean offline = new AtomicBoolean(true);
177 
178 	/***
179 	 * Checks if this task is already scheduled.
180 	 * 
181 	 * @param taskType
182 	 *            the task handle to check, must not be <code>null</code>.
183 	 * @return <code>true</code> if it is scheduled.
184 	 */
185 	public boolean isScheduledOrExecuted(final Object taskType) {
186 		if (taskType == null) {
187 			throw new NullArgumentException("taskType");
188 		}
189 		return scheduledTasks.containsKey(taskType);
190 	}
191 
192 	private final ReadWriteLock executorLock = new ReentrantReadWriteLock();
193 
194 	/***
195 	 * The online tasks executor.
196 	 */
197 	private ExecutorService onlineExecutor = null;
198 
199 	/***
200 	 * Executes tasks that does not require Internet access.
201 	 */
202 	private ExecutorService offlineExecutor = Executors
203 			.newCachedThreadPool(this);
204 
205 	/***
206 	 * Sets the online/offline state of the executor. During the offline period
207 	 * the executor rejects online tasks.
208 	 * 
209 	 * @param offline
210 	 *            if <code>true</code> then the executor sets itself as offline
211 	 *            and all pending online tasks are terminated.
212 	 */
213 	public void setOffline(final boolean offline) {
214 		if (!this.offline.compareAndSet(!offline, offline)) {
215 			return;
216 		}
217 		executorLock.writeLock().lock();
218 		try {
219 			if (offline) {
220 				// terminate the online task executor
221 				onlineExecutor.shutdownNow();
222 				onlineExecutor = null;
223 			} else {
224 				if (offlineExecutor.isShutdown()) {
225 					return;
226 				}
227 				onlineExecutor = Executors.newCachedThreadPool(this);
228 			}
229 		} finally {
230 			executorLock.writeLock().unlock();
231 		}
232 	}
233 
234 	private final AtomicInteger threadId = new AtomicInteger();
235 
236 	public Thread newThread(Runnable r) {
237 		final Thread result = new Thread(r, "backgroundOp-"
238 				+ threadId.getAndIncrement());
239 		result.setPriority(Thread.MIN_PRIORITY);
240 		result.setDaemon(true);
241 		return result;
242 	}
243 
244 	/***
245 	 * Immediately stops all running/pending tasks.
246 	 */
247 	public void stopAllTasks() {
248 		executorLock.writeLock().lock();
249 		try {
250 			offlineExecutor.shutdownNow();
251 			offlineExecutor = Executors.newCachedThreadPool(this);
252 			if (onlineExecutor != null) {
253 				onlineExecutor.shutdownNow();
254 				onlineExecutor = Executors.newCachedThreadPool(this);
255 			}
256 		} finally {
257 			executorLock.writeLock().unlock();
258 		}
259 	}
260 
261 	/***
262 	 * Immediately terminates the executor and all pending tasks. The executor
263 	 * will reject all submitted tasks from now on.
264 	 */
265 	public void terminate() {
266 		executorLock.readLock().lock();
267 		try {
268 			offlineExecutor.shutdownNow();
269 			if (onlineExecutor != null) {
270 				onlineExecutor.shutdownNow();
271 			}
272 		} finally {
273 			executorLock.readLock().unlock();
274 		}
275 	}
276 
277 	private final String failed = AmbientApplication.getInstance().getString(
278 			R.string.failed);
279 
280 	/***
281 	 * This class wraps given runnable and takes care of counting
282 	 * scheduled/running tasks instances and handling exceptions.
283 	 * 
284 	 * @author Martin Vysny
285 	 */
286 	private class ProtectedRunnable implements Runnable {
287 		private final Runnable r;
288 		private final TaskInfo task;
289 		private final Object taskType;
290 
291 		/***
292 		 * Creates new instance.
293 		 * 
294 		 * @param r
295 		 *            the wrapped runnable.
296 		 * @param task
297 		 *            this task info.
298 		 * @param taskType
299 		 *            this task type.
300 		 */
301 		public ProtectedRunnable(final Runnable r, final TaskInfo task,
302 				final Object taskType) {
303 			this.r = r;
304 			this.task = task;
305 			this.taskType = taskType;
306 		}
307 
308 		public void run() {
309 			if (task.canceled) {
310 				return;
311 			}
312 			// synchronize on the executor instance to prevent reporting
313 			// decreasing numbers. This could happen when one thread increments
314 			// the value and enters the fireEvent method after another thread
315 			// incremented the value but still did not entered the fireEvent.
316 			synchronized (BackgroundOpExecutor.this) {
317 				taskInfo.set(task);
318 				fireEvent();
319 			}
320 			try {
321 				task.thread = Thread.currentThread();
322 				try {
323 					if (task.canceled) {
324 						return;
325 					}
326 					r.run();
327 				} finally {
328 					task.thread = null;
329 				}
330 			} catch (final Throwable e) {
331 				if (Thread.currentThread().isInterrupted()) {
332 					Log.i(r.getClass().getSimpleName(), "Interrupted: "
333 							+ e.getMessage(), e);
334 				} else {
335 					AmbientApplication.getInstance().error(r.getClass(), true,
336 							task.name + " " + failed, e);
337 				}
338 			} finally {
339 				scheduledTasks.remove(taskType);
340 				taskInfo.set(null);
341 				// synchronize on the executor instance to prevent reporting
342 				// increasing numbers
343 				fireEvent();
344 			}
345 		}
346 	}
347 
348 	private final ThreadLocal<TaskInfo> taskInfo = new ThreadLocal<TaskInfo>();
349 
350 	private synchronized void fireEvent() {
351 		String randomName = null;
352 		int taskCount = 0;
353 		int progress = 0;
354 		int maxProgress = 0;
355 		for (final TaskInfo info : scheduledTasks.values()) {
356 			taskCount++;
357 			if (randomName == null) {
358 				randomName = info.name;
359 			}
360 			progress += info.progress;
361 			maxProgress += info.maxProgress;
362 		}
363 		if (maxProgress <= 0) {
364 			maxProgress = 100;
365 		}
366 		AmbientApplication.getInstance().getBus().getInvocator(
367 				IBackgroundTask.class, true).backgroundTask(taskCount,
368 				randomName, progress, maxProgress);
369 	}
370 
371 	/***
372 	 * Sets a progress of the current task. Must be invoked from a task
373 	 * {@link Runnable}.
374 	 * 
375 	 * @param taskCount
376 	 *            unused, ignored.
377 	 * @param name
378 	 *            a new name for the task.
379 	 * @param progress
380 	 *            current task progress.
381 	 * @param maxProgress
382 	 *            maximum progress.
383 	 */
384 	public void backgroundTask(int taskCount, String name, int progress,
385 			int maxProgress) {
386 		final TaskInfo info = taskInfo.get();
387 		if (info == null) {
388 			throw new IllegalStateException("Must be invoked from a task");
389 		}
390 		info.name = name;
391 		info.progress = progress < 0 ? 0 : progress;
392 		info.maxProgress = maxProgress < info.progress ? info.progress
393 				: maxProgress;
394 		fireEvent();
395 	}
396 
397 	/***
398 	 * Cancels given task. Does nothing if no such task is running.
399 	 * 
400 	 * @param taskType
401 	 *            the task type, must not be <code>null</code>.
402 	 */
403 	public void cancel(final Object taskType) {
404 		final TaskInfo info = scheduledTasks.get(taskType);
405 		if (info == null) {
406 			return;
407 		}
408 		info.canceled = true;
409 		final Thread t = info.thread;
410 		if (t != null) {
411 			t.interrupt();
412 		}
413 	}
414 }