Dispatcher负责分发任务,比如提交请求、取消请求、暂停请求、恢复请求等等。
DispatcherThread和DispatcherHandler
Dispatcher通过DispatcherThread和DispatcherHandler开启一条线程来分发任务,把任务分发到线程池和主线程中执行。Dispatcher的dispatchXXX方法给DispatcherHandler发送Message。DispatcherHandler在DispatcherThread中调用Dispatcher的performXXX方法来处理任务。
Dispatcher(Context context, ExecutorService service, Handler mainThreadHandler, Downloader downloader, Cache cache, Stats stats) { // 创建并启动DispatcherThread this.dispatcherThread = new DispatcherThread(); this.dispatcherThread.start(); .... // 创建DispatcherHandler处理任务 this.handler = new DispatcherHandler(dispatcherThread.getLooper(), this); ...}// DispatcherThread继承了HandlerThread。static class DispatcherThread extends HandlerThread { DispatcherThread() { super(Utils.THREAD_PREFIX + DISPATCHER_THREAD_NAME, THREAD_PRIORITY_BACKGROUND); }}private static class DispatcherHandler extends Handler { private final Dispatcher dispatcher; public DispatcherHandler(Looper looper, Dispatcher dispatcher) { super(looper); this.dispatcher = dispatcher; } @Override public void handleMessage(final Message msg) { // 通过Dispatcher的performXXX方法来处理任务 switch (msg.what) { case REQUEST_SUBMIT: { Action action = (Action) msg.obj; dispatcher.performSubmit(action); break; } case REQUEST_CANCEL: { Action action = (Action) msg.obj; dispatcher.performCancel(action); break; } case TAG_PAUSE: { Object tag = msg.obj; dispatcher.performPauseTag(tag); break; } case TAG_RESUME: { Object tag = msg.obj; dispatcher.performResumeTag(tag); break; } case HUNTER_COMPLETE: { BitmapHunter hunter = (BitmapHunter) msg.obj; dispatcher.performComplete(hunter); break; } case HUNTER_RETRY: { BitmapHunter hunter = (BitmapHunter) msg.obj; dispatcher.performRetry(hunter); break; } case HUNTER_DECODE_FAILED: { BitmapHunter hunter = (BitmapHunter) msg.obj; dispatcher.performError(hunter, false); break; } case HUNTER_DELAY_NEXT_BATCH: { dispatcher.performBatchComplete(); break; } case NETWORK_STATE_CHANGE: { NetworkInfo info = (NetworkInfo) msg.obj; dispatcher.performNetworkStateChange(info); break; } case AIRPLANE_MODE_CHANGE: { dispatcher.performAirplaneModeChange(msg.arg1 == AIRPLANE_MODE_ON); break; } default: Picasso.HANDLER.post(new Runnable() { @Override public void run() { throw new AssertionError("Unknown handler message received: " + msg.what); } }); } }}复制代码
flushStackLocalLeaks
Dispatcher的构造函数调用了Utils的flushStackLocalLeaks方法。因为在Android 5.0前,因为HandlerThread总是保留了最后一个Message的引用,为了避免长时间保留该引用而引起内存泄漏,该方法每隔1秒钟就给HandlerThread的Handler发送一个空Message。
static void flushStackLocalLeaks(Looper looper) { Handler handler = new Handler(looper) { @Override public void handleMessage(Message msg) { sendMessageDelayed(obtainMessage(), THREAD_LEAK_CLEANING_MS); } }; handler.sendMessageDelayed(handler.obtainMessage(), THREAD_LEAK_CLEANING_MS);}复制代码
NetworkBroadcastReceiver
Dispatcher在构造函数中注册了一个BroadcastReceiver来监听网络连接和飞行模式的变化。
static class NetworkBroadcastReceiver extends BroadcastReceiver { static final String EXTRA_AIRPLANE_STATE = "state"; private final Dispatcher dispatcher; NetworkBroadcastReceiver(Dispatcher dispatcher) { this.dispatcher = dispatcher; } void register() { IntentFilter filter = new IntentFilter(); // 监听飞行模式的变化 filter.addAction(ACTION_AIRPLANE_MODE_CHANGED); // 如果有监听网络的权限,就监听网络变化 if (dispatcher.scansNetworkChanges) { filter.addAction(CONNECTIVITY_ACTION); } dispatcher.context.registerReceiver(this, filter); } void unregister() { dispatcher.context.unregisterReceiver(this); } @Override public void onReceive(Context context, Intent intent) { if (intent == null) { return; } final String action = intent.getAction(); if (ACTION_AIRPLANE_MODE_CHANGED.equals(action)) { if (!intent.hasExtra(EXTRA_AIRPLANE_STATE)) { return; } // 处理飞行模式变化 dispatcher.dispatchAirplaneModeChange(intent.getBooleanExtra(EXTRA_AIRPLANE_STATE, false)); } else if (CONNECTIVITY_ACTION.equals(action)) { ConnectivityManager connectivityManager = getService(context, CONNECTIVITY_SERVICE); // 处理网络变化 dispatcher.dispatchNetworkStateChange(connectivityManager.getActiveNetworkInfo()); } }}复制代码
performSubmit
// 提交新的图片请求void performSubmit(Action action, boolean dismissFailed) { // 如果该Action对应的tag被暂停了,则把该Action和它对应的Target放入pausedActions中, // 以便对应的tag被resume时,重新从处理该请求 if (pausedTags.contains(action.getTag())) { pausedActions.put(action.getTarget(), action); if (action.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_PAUSED, action.request.logId(), "because tag '" + action.getTag() + "' is paused"); } return; } // 判断该Action对应的key,是否存在对应的BitmapHunter。 // 如果有则调用对应的BitmapHunter的attach方法。 BitmapHunter hunter = hunterMap.get(action.getKey()); if (hunter != null) { hunter.attach(action); return; } // 如果线程池已经关闭了则返回。 if (service.isShutdown()) { if (action.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_IGNORED, action.request.logId(), "because shut down"); } return; } // 如果没有对应的BitmapHunter则创建一个,把新创建的BitmapHunter提交到线程池中,并把保持线程池返回的future。 // 把Aciton的key和创建的BitmapHunter,放入hunterMap中。 hunter = forRequest(action.getPicasso(), this, cache, stats, action); hunter.future = service.submit(hunter); hunterMap.put(action.getKey(), hunter); // 如果dismissFailed为true,则清除对应的失败Action。 if (dismissFailed) { failedActions.remove(action.getTarget()); } if (action.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_ENQUEUED, action.request.logId()); }}复制代码
performCancel
// 取消图片请求void performCancel(Action action) { String key = action.getKey(); BitmapHunter hunter = hunterMap.get(key); if (hunter != null) { // 如果该Action有对应的BitmapHunter,则detach该Action,并尝试取消BitmapHunter。 hunter.detach(action); // 成功取消了BitmapHunter,则把对应的BitmapHunter从hunterMap中清除。 if (hunter.cancel()) { hunterMap.remove(key); if (action.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_CANCELED, action.getRequest().logId()); } } } // 如果该Action已经被暂停,则把它从pausedActions清除。 if (pausedTags.contains(action.getTag())) { pausedActions.remove(action.getTarget()); if (action.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_CANCELED, action.getRequest().logId(), "because paused request got canceled"); } } // 把该Action从failedActions中清除。 Action remove = failedActions.remove(action.getTarget()); if (remove != null && remove.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_CANCELED, remove.getRequest().logId(), "from replaying"); }}复制代码
performPauseTag
// 暂停请求void performPauseTag(Object tag) { // 该tag已经被暂停了,则返回 if (!pausedTags.add(tag)) { return; } // 迭代hunterMap,查找并detach该tag对应的action,然后把它放入pausedActions中。 for (Iteratorit = hunterMap.values().iterator(); it.hasNext();) { BitmapHunter hunter = it.next(); boolean loggingEnabled = hunter.getPicasso().loggingEnabled; Action single = hunter.getAction(); List joined = hunter.getActions(); boolean hasMultiple = joined != null && !joined.isEmpty(); if (single == null && !hasMultiple) { continue; } if (single != null && single.getTag().equals(tag)) { hunter.detach(single); pausedActions.put(single.getTarget(), single); if (loggingEnabled) { log(OWNER_DISPATCHER, VERB_PAUSED, single.request.logId(), "because tag '" + tag + "' was paused"); } } if (hasMultiple) { for (int i = joined.size() - 1; i >= 0; i--) { Action action = joined.get(i); if (!action.getTag().equals(tag)) { continue; } hunter.detach(action); pausedActions.put(action.getTarget(), action); if (loggingEnabled) { log(OWNER_DISPATCHER, VERB_PAUSED, action.request.logId(), "because tag '" + tag + "' was paused"); } } } // 在detach对应的Action后,尝试取消BitmapHunter。如果成功,则把它从hunter中移除 if (hunter.cancel()) { it.remove(); if (loggingEnabled) { log(OWNER_DISPATCHER, VERB_CANCELED, getLogIdsForHunter(hunter), "all actions paused"); } } }}复制代码
performResumeTag
// 恢复被暂停的请求void performResumeTag(Object tag) { // 把tag从pausedTag中移除 if (!pausedTags.remove(tag)) { return; } // 迭代pausedActions,查找tag对应的Action,从pausedActions中移除,然后让Dispatcher批量恢复 Listbatch = null; for (Iterator i = pausedActions.values().iterator(); i.hasNext();) { Action action = i.next(); if (action.getTag().equals(tag)) { if (batch == null) { batch = new ArrayList (); } batch.add(action); i.remove(); } } if (batch != null) { mainThreadHandler.sendMessage(mainThreadHandler.obtainMessage(REQUEST_BATCH_RESUME, batch)); }}复制代码
performBatchComplete
// 批量处理完成的请求void performBatchComplete() { // 复制batch Listcopy = new ArrayList (batch); // 清空batch batch.clear(); // 把复制后的BitmapHunter交给主线程的Handler处理 mainThreadHandler.sendMessage(mainThreadHandler.obtainMessage(HUNTER_BATCH_COMPLETE, copy)); logBatch(copy);}复制代码
performRetry
// 处理BitmapHunter重试请求void performRetry(BitmapHunter hunter) { // 如果BitmapHunter被取消,则返回。 if (hunter.isCancelled()) return; // 如果线程池已关闭,则发送error消息,然后返回。 if (service.isShutdown()) { performError(hunter, false); return; } NetworkInfo networkInfo = null; if (scansNetworkChanges) { ConnectivityManager connectivityManager = getService(context, CONNECTIVITY_SERVICE); networkInfo = connectivityManager.getActiveNetworkInfo(); } boolean hasConnectivity = networkInfo != null && networkInfo.isConnected(); boolean shouldRetryHunter = hunter.shouldRetry(airplaneMode, networkInfo); boolean supportsReplay = hunter.supportsReplay(); if (!shouldRetryHunter) { boolean willReplay = scansNetworkChanges && supportsReplay; // 如果当前的飞行模式和网络情况不允许马上重试,则发送error消息。 performError(hunter, willReplay); // 如果支持后面重发,则做标记。 if (willReplay) { markForReplay(hunter); } return; } // 如果没有权限检测网络或者当前网络是连接的,则马上重发请求。 if (!scansNetworkChanges || hasConnectivity) { if (hunter.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_RETRYING, getLogIdsForHunter(hunter)); } //noinspection ThrowableResultOfMethodCallIgnored if (hunter.getException() instanceof NetworkRequestHandler.ContentLengthException) { // 如果是因为发生了ContentLengthException,则把网络策略改为不读本地缓存。 hunter.networkPolicy |= NetworkPolicy.NO_CACHE.index; } hunter.future = service.submit(hunter); return; } // 其他情况,比如不运行马上重试或者网络断开的,则发送error消息。 performError(hunter, supportsReplay); // 如果允许后面重发的则做标记。 if (supportsReplay) { markForReplay(hunter); }}// 把BitmapHunter中的ActionwillReplay标记为true。private void markForReplay(BitmapHunter hunter) { Action action = hunter.getAction(); if (action != null) { markForReplay(action); } Listjoined = hunter.getActions(); if (joined != null) { for (int i = 0, n = joined.size(); i < n; i++) { Action join = joined.get(i); markForReplay(join); } }}// 把Action的willReplay标记为true,并加入failedActions中等待重新提交执行private void markForReplay(Action action) { Object target = action.getTarget(); if (target != null) { action.willReplay = true; failedActions.put(target, action); }}复制代码
performComplete
// 处理BitmapHunter执行完成事件void performComplete(BitmapHunter hunter) { // 如果根据内存策略需要缓存到内存,则把Bitmap缓存到内存中。 if (shouldWriteToMemoryCache(hunter.getMemoryPolicy())) { cache.set(hunter.getKey(), hunter.getResult()); } // 从hunterMap中移除 hunterMap.remove(hunter.getKey()); // 把BitmapHunter加入batch中,等待批量处理。 batch(hunter); if (hunter.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_BATCHED, getLogIdsForHunter(hunter), "for completion"); }}private void batch(BitmapHunter hunter) { // 如果BitmapHunter已经被取消了,则返回 if (hunter.isCancelled()) { return; } // 把BitmapHunter添加到batch中 batch.add(hunter); // 每隔200毫秒把batch提交给handler批量处理 if (!handler.hasMessages(HUNTER_DELAY_NEXT_BATCH)) { handler.sendEmptyMessageDelayed(HUNTER_DELAY_NEXT_BATCH, BATCH_DELAY); }}复制代码
performError
// 处理BitmapHunter执行过程中发生的错误void performError(BitmapHunter hunter, boolean willReplay) { if (hunter.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_BATCHED, getLogIdsForHunter(hunter), "for error" + (willReplay ? " (will replay)" : "")); } // 把BitmapHunter从hunterMap中移除 hunterMap.remove(hunter.getKey()); // 添加到batch batch(hunter);}private void batch(BitmapHunter hunter) { // 如果BitmapHunter已经被取消了,则返回 if (hunter.isCancelled()) { return; } // 把BitmapHunter添加到batch中 batch.add(hunter); // 每隔200毫秒把batch提交给handler批量处理 if (!handler.hasMessages(HUNTER_DELAY_NEXT_BATCH)) { handler.sendEmptyMessageDelayed(HUNTER_DELAY_NEXT_BATCH, BATCH_DELAY); }}复制代码
performNetworkStateChange
// 处理网络变化,重新提交因为网络问题而停止的Actionvoid performNetworkStateChange(NetworkInfo info) { // 如果线程池是默认的PicassoExecutorService,则根据网络情况调整线程数量。 if (service instanceof PicassoExecutorService) { ((PicassoExecutorService) service).adjustThreadCount(info); } // 如果网络是连接的,则重新提交失败的Action。 if (info != null && info.isConnected()) { flushFailedActions(); }}private void flushFailedActions() { // 迭代failedActions,移除Action,并重新提交Action。 if (!failedActions.isEmpty()) { Iteratoriterator = failedActions.values().iterator(); while (iterator.hasNext()) { Action action = iterator.next(); iterator.remove(); if (action.getPicasso().loggingEnabled) { log(OWNER_DISPATCHER, VERB_REPLAYING, action.getRequest().logId()); } performSubmit(action, false); } }}复制代码