1 /***
2 * Ambient - A music player for the Android platform
3 Copyright (C) 2007 Martin Vysny
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package sk.baka.ambient;
20
21 import java.util.Map;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.RejectedExecutionException;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.concurrent.locks.ReadWriteLock;
31 import java.util.concurrent.locks.ReentrantReadWriteLock;
32
33 import sk.baka.ambient.commons.NullArgumentException;
34 import android.util.Log;
35
36 /***
37 * <p>
38 * Able to execute multiple long operations running simultaneously in the
39 * background. Accepts {@link Runnable} task implementations. The tasks will run
40 * in low-priority daemon thread. Thread-safe.
41 * </p>
42 * <p>
43 * Tasks are differentiated based on a task type object. At most a single
44 * instance of a task of given type may be submitted - multiple submissions of
45 * tasks having the same task type will do nothing unless there is no such task
46 * in executor's queue or already being executed. The task type object must
47 * comply the contract for a {@link Map} key.
48 * </p>
49 * <p>
50 * A task must periodically check for its {@link Thread#isInterrupted()
51 * interrupt} status. If it is interrupted, it should terminate ASAP. It may do
52 * so by throwing an Exception - exceptions are not reported when the process is
53 * interrupted.
54 * </p>
55 * <p>
56 * Uncaught exceptions thrown by tasks are reported using
57 * {@link AmbientApplication#error(Class, boolean, String, Throwable)}.
58 * </p>
59 * <p>
60 * The executor initially starts in offline mode. Use
61 * {@link #setOffline(boolean)} to change the mode.
62 * </p>
63 * <p>
64 * There is no support for mutually exclusive tasks. If you need this
65 * functionality you have to implement it yourself, for example by synchronizing
66 * on the same object instance.
67 * </p>
68 *
69 * @author Martin Vysny
70 */
71 public final class BackgroundOpExecutor implements ThreadFactory,
72 IBackgroundTask {
73 /***
74 * Describes a single task.
75 *
76 * @author mvy
77 */
78 private static class TaskInfo {
79 /***
80 * The current task name.
81 */
82 public volatile String name;
83 /***
84 * Current progress of the task.
85 */
86 public volatile int progress = 0;
87 /***
88 * The maximum progress.
89 */
90 public volatile int maxProgress = 100;
91
92 /***
93 * The thread the task is executed in. <code>null</code> if the task is
94 * not currently being run.
95 */
96 public volatile Thread thread;
97
98 /***
99 * <code>true</code> if the task was canceled using the {@link BackgroundOpExecutor#cancel(Object)} method.
100 */
101 public volatile boolean canceled = false;
102 }
103
104 /***
105 * Contains set of scheduled or executing tasks. Maps the task type to the
106 * task name.
107 */
108 private final ConcurrentMap<Object, TaskInfo> scheduledTasks = new ConcurrentHashMap<Object, TaskInfo>();
109
110 /***
111 * <p>
112 * Schedules given task for execution. Does nothing if the task is rejected
113 * to be submitted. Task may be rejected for these reasons:
114 * </p>
115 * <ul>
116 * <li>it is already scheduled for execution or executed</li>
117 * <li>Processing of online task was requested while in offline mode</li>
118 * </ul>
119 *
120 * @param task
121 * the task to schedule, must not be <code>null</code>.
122 * @param taskType
123 * the task type, must not be <code>null</code>.
124 * @param online
125 * if <code>true</code> then this task processes some online
126 * content.
127 * @param name
128 * the displayable task name. A grammar construct of <code>name +
129 * "failed"</code> should be a meaningful text.
130 * @return <code>true</code> if the task was scheduled, <code>false</code>
131 * if it was rejected.
132 */
133 public boolean schedule(final Runnable task, final Object taskType,
134 final boolean online, final String name) {
135 if (name == null) {
136 throw new NullArgumentException("name");
137 }
138 if (taskType == null) {
139 throw new NullArgumentException("taskType");
140 }
141 final TaskInfo info = new TaskInfo();
142 info.name = name;
143 if (scheduledTasks.putIfAbsent(taskType, info) != null) {
144 return false;
145 }
146
147 boolean scheduled = false;
148 executorLock.readLock().lock();
149 try {
150 final ExecutorService executor;
151 if (online) {
152 executor = onlineExecutor;
153 if (executor == null) {
154 return false;
155 }
156 } else {
157 executor = offlineExecutor;
158 }
159 try {
160 executor.execute(new ProtectedRunnable(task, info, taskType));
161 } catch (RejectedExecutionException ex) {
162
163
164 return false;
165 }
166 scheduled = true;
167 return true;
168 } finally {
169 if (!scheduled) {
170 scheduledTasks.remove(taskType);
171 }
172 executorLock.readLock().unlock();
173 }
174 }
175
176 private final AtomicBoolean offline = new AtomicBoolean(true);
177
178 /***
179 * Checks if this task is already scheduled.
180 *
181 * @param taskType
182 * the task handle to check, must not be <code>null</code>.
183 * @return <code>true</code> if it is scheduled.
184 */
185 public boolean isScheduledOrExecuted(final Object taskType) {
186 if (taskType == null) {
187 throw new NullArgumentException("taskType");
188 }
189 return scheduledTasks.containsKey(taskType);
190 }
191
192 private final ReadWriteLock executorLock = new ReentrantReadWriteLock();
193
194 /***
195 * The online tasks executor.
196 */
197 private ExecutorService onlineExecutor = null;
198
199 /***
200 * Executes tasks that does not require Internet access.
201 */
202 private ExecutorService offlineExecutor = Executors
203 .newCachedThreadPool(this);
204
205 /***
206 * Sets the online/offline state of the executor. During the offline period
207 * the executor rejects online tasks.
208 *
209 * @param offline
210 * if <code>true</code> then the executor sets itself as offline
211 * and all pending online tasks are terminated.
212 */
213 public void setOffline(final boolean offline) {
214 if (!this.offline.compareAndSet(!offline, offline)) {
215 return;
216 }
217 executorLock.writeLock().lock();
218 try {
219 if (offline) {
220
221 onlineExecutor.shutdownNow();
222 onlineExecutor = null;
223 } else {
224 if (offlineExecutor.isShutdown()) {
225 return;
226 }
227 onlineExecutor = Executors.newCachedThreadPool(this);
228 }
229 } finally {
230 executorLock.writeLock().unlock();
231 }
232 }
233
234 private final AtomicInteger threadId = new AtomicInteger();
235
236 public Thread newThread(Runnable r) {
237 final Thread result = new Thread(r, "backgroundOp-"
238 + threadId.getAndIncrement());
239 result.setPriority(Thread.MIN_PRIORITY);
240 result.setDaemon(true);
241 return result;
242 }
243
244 /***
245 * Immediately stops all running/pending tasks.
246 */
247 public void stopAllTasks() {
248 executorLock.writeLock().lock();
249 try {
250 offlineExecutor.shutdownNow();
251 offlineExecutor = Executors.newCachedThreadPool(this);
252 if (onlineExecutor != null) {
253 onlineExecutor.shutdownNow();
254 onlineExecutor = Executors.newCachedThreadPool(this);
255 }
256 } finally {
257 executorLock.writeLock().unlock();
258 }
259 }
260
261 /***
262 * Immediately terminates the executor and all pending tasks. The executor
263 * will reject all submitted tasks from now on.
264 */
265 public void terminate() {
266 executorLock.readLock().lock();
267 try {
268 offlineExecutor.shutdownNow();
269 if (onlineExecutor != null) {
270 onlineExecutor.shutdownNow();
271 }
272 } finally {
273 executorLock.readLock().unlock();
274 }
275 }
276
277 private final String failed = AmbientApplication.getInstance().getString(
278 R.string.failed);
279
280 /***
281 * This class wraps given runnable and takes care of counting
282 * scheduled/running tasks instances and handling exceptions.
283 *
284 * @author Martin Vysny
285 */
286 private class ProtectedRunnable implements Runnable {
287 private final Runnable r;
288 private final TaskInfo task;
289 private final Object taskType;
290
291 /***
292 * Creates new instance.
293 *
294 * @param r
295 * the wrapped runnable.
296 * @param task
297 * this task info.
298 * @param taskType
299 * this task type.
300 */
301 public ProtectedRunnable(final Runnable r, final TaskInfo task,
302 final Object taskType) {
303 this.r = r;
304 this.task = task;
305 this.taskType = taskType;
306 }
307
308 public void run() {
309 if (task.canceled) {
310 return;
311 }
312
313
314
315
316 synchronized (BackgroundOpExecutor.this) {
317 taskInfo.set(task);
318 fireEvent();
319 }
320 try {
321 task.thread = Thread.currentThread();
322 try {
323 if (task.canceled) {
324 return;
325 }
326 r.run();
327 } finally {
328 task.thread = null;
329 }
330 } catch (final Throwable e) {
331 if (Thread.currentThread().isInterrupted()) {
332 Log.i(r.getClass().getSimpleName(), "Interrupted: "
333 + e.getMessage(), e);
334 } else {
335 AmbientApplication.getInstance().error(r.getClass(), true,
336 task.name + " " + failed, e);
337 }
338 } finally {
339 scheduledTasks.remove(taskType);
340 taskInfo.set(null);
341
342
343 fireEvent();
344 }
345 }
346 }
347
348 private final ThreadLocal<TaskInfo> taskInfo = new ThreadLocal<TaskInfo>();
349
350 private synchronized void fireEvent() {
351 String randomName = null;
352 int taskCount = 0;
353 int progress = 0;
354 int maxProgress = 0;
355 for (final TaskInfo info : scheduledTasks.values()) {
356 taskCount++;
357 if (randomName == null) {
358 randomName = info.name;
359 }
360 progress += info.progress;
361 maxProgress += info.maxProgress;
362 }
363 if (maxProgress <= 0) {
364 maxProgress = 100;
365 }
366 AmbientApplication.getInstance().getBus().getInvocator(
367 IBackgroundTask.class, true).backgroundTask(taskCount,
368 randomName, progress, maxProgress);
369 }
370
371 /***
372 * Sets a progress of the current task. Must be invoked from a task
373 * {@link Runnable}.
374 *
375 * @param taskCount
376 * unused, ignored.
377 * @param name
378 * a new name for the task.
379 * @param progress
380 * current task progress.
381 * @param maxProgress
382 * maximum progress.
383 */
384 public void backgroundTask(int taskCount, String name, int progress,
385 int maxProgress) {
386 final TaskInfo info = taskInfo.get();
387 if (info == null) {
388 throw new IllegalStateException("Must be invoked from a task");
389 }
390 info.name = name;
391 info.progress = progress < 0 ? 0 : progress;
392 info.maxProgress = maxProgress < info.progress ? info.progress
393 : maxProgress;
394 fireEvent();
395 }
396
397 /***
398 * Cancels given task. Does nothing if no such task is running.
399 *
400 * @param taskType
401 * the task type, must not be <code>null</code>.
402 */
403 public void cancel(final Object taskType) {
404 final TaskInfo info = scheduledTasks.get(taskType);
405 if (info == null) {
406 return;
407 }
408 info.canceled = true;
409 final Thread t = info.thread;
410 if (t != null) {
411 t.interrupt();
412 }
413 }
414 }