2016/03/14

Executor Framework in Android

Executor 為執行緒以及在系統上使用的資源,提供更進一步的控制機制。

Executor 的功能

  1. 建立 worker thread 的 pool 及 queue,控制等待執行的任務數量
  2. 檢查讓執行緒異常結束的錯誤
  3. 等待執行緒結束,並擷取結果
  4. 以固定順序批次執行執行緒,並擷取結果
  5. 啟用背景執行緒,讓使用者能快一點取得結果

SimpleExecutor

針對每一個任務建立一個執行緒。

import java.util.concurrent.Executor;

public class SimpleExecutor implements Executor {
    @Override
    public void execute(Runnable runnable) {
        new Thread(runnable).start();
    }
}

SerialExecutor

以任務佇列 ArrauDeque 儲存所有任務,用 FIFO 的順序執行任務。

private static class SerialExecutor implements Executor {
    final ArrayDequq<Runnable> mTasks = new ArrayDequq<Runnable>();
    Runnable mActive;

    @Override
    public synchronized void execute(final Runnable runnable) {
        mTasks.offer(new Runnable() {
            public void run() {
                try {
                    r.run();
                } finally {
                    scheduleNext();
                }
            }
        });
        if(mActive==null) {
            scheduleNext();
        }
    }
    
    protected synchronized void scheduleNext() {
        if( (mActive=mTasks.poll()) !=null ) {
            THREAD_POOL_EXECUTOR.execute(mActive);
        }
    }
}

ThreadPool

為避免 APP 建立太多 Thread,物件的建立與銷毀浪費了系統的資源,所以可使用 ThreadPool。Pool 的大小可以由 processors 數量來決定。

Runtime.getRuntime().availableProcessors();

ThreadPool 的參數

  1. 固定尺寸: 由 Executors.newFixedThreadPool(n) 產生
  2. 動態尺寸: pool隨著任務數量多寡,尺寸會變大或縮小
  3. 單執行緒: Executor.newSingleThreadExecutor

具有自訂執行緒特性的固定尺寸 ThreadPool sample:

class LowPriorityThreadFactory implements ThreadFactory {

    private static final String TAG = "LowPriorityThreadFactory";
    private static int count = 1;

    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("LowPrio " + count++);
        t.setPriority(4);
        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
        {
            @Override
            public void uncaughtException(Thread t, Throwable e)
            {
                Log.d(TAG, "Thread = " + t.getName() + ", error = " + e.getMessage());
            }
        });
        return t;
    }
}

追蹤 ThreadPool

public class TaskTrackingThreadPool extends ThreadPoolExecutor{

    private static final String TAG = "CustomThreadPool";

    private AtomicInteger mTaskCount = new AtomicInteger(0);

    public TaskTrackingThreadPool() {
        super(3, 3, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        Log.d(TAG, "beforeExecute - thread = " + t.getName());
        mTaskCount.getAndIncrement();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        Log.d(TAG, "afterExecute - thread = " + Thread.currentThread().getName() + "t = " + t);
        mTaskCount.getAndDecrement();
    }

    @Override
    protected void terminated() {
        super.terminated();
        Log.d(TAG, "terminated - thread = " + Thread.currentThread().getName());
    }

    public int getNbrOfTasks() {
        return mTaskCount.get();
    }
}

Sample

利用 invokeAll 讓兩個 worker thread 同時執行兩個獨立的任務,在兩個都完成之後,再將結果合併在一起。

public class InvokeActivity extends Activity {


    private static final String TAG = "InvokeActivity";

    private TextView textStatus;

    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_invoke);
        textStatus = (TextView) findViewById(R.id.text_status);
    }


    public void onButtonClick(View v) {

         // 讓 UI thread 分擔 invokeAll 的執行器 
        SimpleExecutor simpleExecutor = new SimpleExecutor();
        simpleExecutor.execute(new Runnable() {
            @Override
            public void run() {
                // 存放兩個獨立任務
                List<Callable<String>> tasks = new ArrayList<Callable<String>>();
                tasks.add(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        return getFirstPartialDataFromNetwork();
                    }
                });
                tasks.add(new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                        return getSecondPartialDataFromNetwork();
                    }
                });

                // 使用兩個 threads 執行任務
                ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(2);
                try {
                    Log.d(TAG, "invokeAll");
                    List<Future<String>> futures = executor.invokeAll(tasks);
                    Log.d(TAG, "invokeAll after");

                    final String mashedData = mashupResult(futures);

                    textStatus.post(new Runnable() {
                        @Override
                        public void run() {
                            textStatus.setText(mashedData);
                        }
                    });
                    Log.d(TAG, "mashedData = " + mashedData);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }

                executor.shutdown();
            }
        });
    }

    private String getFirstPartialDataFromNetwork() {
        Log.d(TAG, "ProgressReportingTask 1 started");
        SystemClock.sleep(10000);
        Log.d(TAG, "ProgressReportingTask 1 done");
        return "MockA";
    }

    private String getSecondPartialDataFromNetwork() {
        Log.d(TAG, "ProgressReportingTask 2 started");
        SystemClock.sleep(2000);
        Log.d(TAG, "ProgressReportingTask 2 done");
        return "MockB";
    }

    private String mashupResult(List<Future<String>> futures) throws ExecutionException, InterruptedException {
        StringBuilder builder = new StringBuilder();
        for (Future<String> future : futures) {
            builder.append(future.get());
            // 以 future.get 分同步取得結果
        }
        return builder.toString();
    }
}

當 Acvtivity 建立後,就會在背景下載多個圖片

public class ECSImageDownloaderActivity extends Activity {

    private static final String TAG = "ECSImageDownloaderActivity";

    private LinearLayout layoutImages;

    private class ImageDownloadTask implements Callable<Bitmap> {

        @Override
        public Bitmap call() throws Exception {
            return downloadRemoteImage();
        }

        private Bitmap downloadRemoteImage() {
            SystemClock.sleep((int) (5000f - new Random().nextFloat() * 5000f));
            return BitmapFactory.decodeResource(ECSImageDownloaderActivity.this.getResources(), R.drawable.ic_launcher);
        }
    }

    // 代表產生結果的任務的 Callable instance
    private class DownloadCompletionService extends ExecutorCompletionService {

        private ExecutorService mExecutor;

        public DownloadCompletionService(ExecutorService executor) {
            super(executor);
            mExecutor = executor;
        }

        public void shutdown() {
            mExecutor.shutdown();
        }

        public boolean isTerminated() {
            return mExecutor.isTerminated();
        }
    }

    // 由完成 queue 取得任務執行結果的 consumer thread
    private class ConsumerThread extends Thread {

        private DownloadCompletionService mEcs;

        private ConsumerThread(DownloadCompletionService ecs) {
            this.mEcs = ecs;
        }

        @Override
        public void run() {
            super.run();
            try {
                // 如果執行器被終止,所有任務都已經完成
                while(!mEcs.isTerminated()) {
                    Future<Bitmap> future = mEcs.poll(1, TimeUnit.SECONDS);
                    if (future != null) {
                        addImage(future.get());
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_ecs_image_downloader);
        layoutImages = (LinearLayout) findViewById(R.id.layout_images);

        DownloadCompletionService ecs = new DownloadCompletionService(Executors.newCachedThreadPool());
        new ConsumerThread(ecs).start();

        for (int i = 0; i < 5; i++) {
            ecs.submit(new ImageDownloadTask());
        }

        ecs.shutdown();
    }


    private void addImage(final Bitmap image) {
        runOnUiThread(new Runnable() {
            @Override
            public void run() {
                ImageView iv = new ImageView(ECSImageDownloaderActivity.this);
                iv.setImageBitmap(image);
                layoutImages.addView(iv);
            }
        });
    }
}

Reference

Android 高效能多執行緒

沒有留言:

張貼留言