task.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. var session = require('./session');
  2. var util = require('./util');
  3. var originApiMap = {};
  4. var transferToTaskMethod = function (apiMap, apiName) {
  5. originApiMap[apiName] = apiMap[apiName];
  6. apiMap[apiName] = function (params, callback) {
  7. if (params.SkipTask) {
  8. originApiMap[apiName].call(this, params, callback);
  9. } else {
  10. this._addTask(apiName, params, callback);
  11. }
  12. };
  13. };
  14. var initTask = function (cos) {
  15. var queue = [];
  16. var tasks = {};
  17. var uploadingFileCount = 0;
  18. var nextUploadIndex = 0;
  19. // 接口返回简略的任务信息
  20. var formatTask = function (task) {
  21. var t = {
  22. id: task.id,
  23. Bucket: task.Bucket,
  24. Region: task.Region,
  25. Key: task.Key,
  26. FilePath: task.FilePath,
  27. state: task.state,
  28. loaded: task.loaded,
  29. size: task.size,
  30. speed: task.speed,
  31. percent: task.percent,
  32. hashPercent: task.hashPercent,
  33. error: task.error,
  34. };
  35. if (task.FilePath) t.FilePath = task.FilePath;
  36. return t;
  37. };
  38. var emitListUpdate = (function () {
  39. var timer;
  40. var emit = function () {
  41. timer = 0;
  42. cos.emit('task-list-update', { list: util.map(queue, formatTask) });
  43. cos.emit('list-update', { list: util.map(queue, formatTask) });
  44. };
  45. return function () {
  46. if (!timer) timer = setTimeout(emit);
  47. };
  48. })();
  49. var clearQueue = function () {
  50. if (queue.length <= cos.options.UploadQueueSize) return;
  51. for (
  52. var i = 0;
  53. i < nextUploadIndex && // 小于当前操作的 index 才清理
  54. i < queue.length && // 大于队列才清理
  55. queue.length > cos.options.UploadQueueSize; // 如果还太多,才继续清理
  56. ) {
  57. var isActive = queue[i].state === 'waiting' || queue[i].state === 'checking' || queue[i].state === 'uploading';
  58. if (!queue[i] || !isActive) {
  59. tasks[queue[i].id] && delete tasks[queue[i].id];
  60. queue.splice(i, 1);
  61. nextUploadIndex--;
  62. } else {
  63. i++;
  64. }
  65. }
  66. emitListUpdate();
  67. };
  68. var startNextTask = function () {
  69. // 检查是否允许增加执行进程
  70. if (uploadingFileCount >= cos.options.FileParallelLimit) return;
  71. // 跳过不可执行的任务
  72. while (queue[nextUploadIndex] && queue[nextUploadIndex].state !== 'waiting') nextUploadIndex++;
  73. // 检查是否已遍历结束
  74. if (nextUploadIndex >= queue.length) return;
  75. // 上传该遍历到的任务
  76. var task = queue[nextUploadIndex];
  77. nextUploadIndex++;
  78. uploadingFileCount++;
  79. task.state = 'checking';
  80. task.params.onTaskStart && task.params.onTaskStart(formatTask(task));
  81. !task.params.UploadData && (task.params.UploadData = {});
  82. var apiParams = util.formatParams(task.api, task.params);
  83. originApiMap[task.api].call(cos, apiParams, function (err, data) {
  84. if (!cos._isRunningTask(task.id)) return;
  85. if (task.state === 'checking' || task.state === 'uploading') {
  86. task.state = err ? 'error' : 'success';
  87. err && (task.error = err);
  88. uploadingFileCount--;
  89. emitListUpdate();
  90. startNextTask();
  91. task.callback && task.callback(err, data);
  92. if (task.state === 'success') {
  93. if (task.params) {
  94. delete task.params.UploadData;
  95. delete task.params.Body;
  96. delete task.params;
  97. }
  98. delete task.callback;
  99. }
  100. }
  101. clearQueue();
  102. });
  103. emitListUpdate();
  104. // 异步执行下一个任务
  105. setTimeout(startNextTask);
  106. };
  107. var killTask = function (id, switchToState) {
  108. var task = tasks[id];
  109. if (!task) return;
  110. var waiting = task && task.state === 'waiting';
  111. var running = task && (task.state === 'checking' || task.state === 'uploading');
  112. if (
  113. (switchToState === 'canceled' && task.state !== 'canceled') ||
  114. (switchToState === 'paused' && waiting) ||
  115. (switchToState === 'paused' && running)
  116. ) {
  117. if (switchToState === 'paused' && task.params.Body && typeof task.params.Body.pipe === 'function') {
  118. console.error('stream not support pause');
  119. return;
  120. }
  121. task.state = switchToState;
  122. cos.emit('inner-kill-task', { TaskId: id, toState: switchToState });
  123. try {
  124. var UploadId = task && task.params && task.params.UploadData.UploadId;
  125. } catch (e) {}
  126. if (switchToState === 'canceled' && UploadId) session.removeUsing(UploadId);
  127. emitListUpdate();
  128. if (running) {
  129. uploadingFileCount--;
  130. startNextTask();
  131. }
  132. if (switchToState === 'canceled') {
  133. if (task.params) {
  134. delete task.params.UploadData;
  135. delete task.params.Body;
  136. delete task.params;
  137. }
  138. delete task.callback;
  139. }
  140. }
  141. clearQueue();
  142. };
  143. cos._addTasks = function (taskList) {
  144. util.each(taskList, function (task) {
  145. cos._addTask(task.api, task.params, task.callback, true);
  146. });
  147. emitListUpdate();
  148. };
  149. cos._addTask = function (api, params, callback, ignoreAddEvent) {
  150. // 如果小程序版本不支持获取文件分片内容,统一转到 简单上传 接口上传
  151. var simpleUploadMethod = cos.options.SimpleUploadMethod === 'postObject' ? 'postObject' : 'putObject';
  152. if (api === 'sliceUploadFile' && !util.canFileSlice()) api = simpleUploadMethod;
  153. // 复制参数对象
  154. params = util.formatParams(api, params);
  155. // 生成 id
  156. var id = util.uuid();
  157. params.TaskId = id;
  158. params.onTaskReady && params.onTaskReady(id);
  159. var task = {
  160. // env
  161. params: params,
  162. callback: callback,
  163. api: api,
  164. index: queue.length,
  165. // task
  166. id: id,
  167. Bucket: params.Bucket,
  168. Region: params.Region,
  169. Key: params.Key,
  170. FilePath: params.FilePath || '',
  171. state: 'waiting',
  172. loaded: 0,
  173. size: 0,
  174. speed: 0,
  175. percent: 0,
  176. hashPercent: 0,
  177. error: null,
  178. };
  179. var onHashProgress = params.onHashProgress;
  180. params.onHashProgress = function (info) {
  181. if (!cos._isRunningTask(task.id)) return;
  182. task.hashPercent = info.percent;
  183. onHashProgress && onHashProgress(info);
  184. emitListUpdate();
  185. };
  186. var onProgress = params.onProgress;
  187. params.onProgress = function (info) {
  188. if (!cos._isRunningTask(task.id)) return;
  189. task.state === 'checking' && (task.state = 'uploading');
  190. task.loaded = info.loaded;
  191. task.size = info.total;
  192. task.speed = info.speed;
  193. task.percent = info.percent;
  194. onProgress && onProgress(info);
  195. emitListUpdate();
  196. };
  197. // 异步获取 filesize
  198. util.getFileSize(api, params, function (err, size) {
  199. // 开始处理上传
  200. if (err) {
  201. // 如果获取大小出错,不加入队列
  202. callback(err);
  203. return;
  204. }
  205. // 获取完文件大小再把任务加入队列
  206. tasks[id] = task;
  207. queue.push(task);
  208. task.size = size;
  209. !ignoreAddEvent && emitListUpdate();
  210. startNextTask();
  211. clearQueue();
  212. });
  213. return id;
  214. };
  215. cos._isRunningTask = function (id) {
  216. var task = tasks[id];
  217. return !!(task && (task.state === 'checking' || task.state === 'uploading'));
  218. };
  219. cos.getTaskList = function () {
  220. return util.map(queue, formatTask);
  221. };
  222. cos.cancelTask = function (id) {
  223. killTask(id, 'canceled');
  224. };
  225. cos.pauseTask = function (id) {
  226. killTask(id, 'paused');
  227. };
  228. cos.restartTask = function (id) {
  229. var task = tasks[id];
  230. if (task && (task.state === 'paused' || task.state === 'error')) {
  231. task.state = 'waiting';
  232. emitListUpdate();
  233. nextUploadIndex = Math.min(nextUploadIndex, task.index);
  234. startNextTask();
  235. }
  236. };
  237. cos.isUploadRunning = function () {
  238. return uploadingFileCount || nextUploadIndex < queue.length;
  239. };
  240. };
  241. module.exports.transferToTaskMethod = transferToTaskMethod;
  242. module.exports.init = initTask;