如何实现Promise并发控制
在前端开发的面试环节中,经常会遇到这样一类题目:要求实现控制并发队列,给定异步任务和并发数量作为参数,完成特定的并发控制功能。不少小伙伴面对这类问题时会感到头疼,今天就带大家一步步从0到1实现Promise并发控制,用通俗易懂的方式讲解,保证大家都能看明白。
一、基础功能实现思路
(一)示例代码准备
首先,我们通过一个简单的delay
函数来模拟异步任务。这个函数接收两个参数,一个是要返回的文本text
,另一个是延迟的时间time
。它返回一个Promise
,在设定的延迟时间后,resolve
返回对应的文本。
function delay(text,time){ return new Promise((resolve,reject) => { setTimeout(() => { resolve(text) },time) }) }
接着,定义一些具体的异步任务p1
到p5
,它们分别返回不同的文本,并且延迟时间也不一样。
const p1 = () => delay('1',5000) const p2 = () => delay('2',2000) const p3 = () => delay('3',3000) const p4 = () => delay('4',2000) const p5 = () => delay('5',3000) const p = [p1,p2,p3,p4,p5]
(二)Queue类的初始化
我们要创建一个Queue
类来实现并发控制。在初始化这个类的时候,需要设置异步任务数组和并发数量limit
。在constructor
构造函数里,除了把传入的参数赋值给类的属性外,还要从异步任务数组的开头取出limit
个任务,添加到一个任务队列queue
里,并且默认直接开始执行这些异步任务。
class Queue { constructor(initArr, limit) { this.queue = []; this.limit = limit; this.lists = initArr; for (let i = 0; i < this.limit; i++) { this.queue.push(this.lists.shift()) } this.work(); } work() { // 后续会定义这个函数的具体功能 } limit = 0; queue = []; lists = []; }
(三)work函数的功能实现
work
函数的作用是执行当前任务队列queue
里的所有任务。在执行过程中,每当有一个异步任务完成,就从剩余的异步任务数组lists
里取出第一个任务,添加到任务队列queue
中。
pop() { if (this.lists.length > 0) { this.queue.push(this.lists.shift()); } } work() { while(this.queue.length > 0) { const requestFunc = this.queue.shift(); const request = requestFunc(); // 使用Promise.resolve确保request不是promise时,代码也能正常执行 Promise.resolve(request).finally(res => { this.pop(); this.work(); }) } }
这样,基础版本的Promise并发控制就实现啦,完整代码如下:
class Queue { constructor(initArr, limit) { this.queue = []; this.limit = limit; this.lists = initArr; for (let i = 0; i < this.limit; i++) { this.queue.push(this.lists.shift()) } this.work(); } pop() { if (this.lists.length > 0) { this.queue.push(this.lists.shift()); } } work() { while(this.queue.length > 0) { const requestFunc = this.queue.shift(); const request = requestFunc(); Promise.resolve(request).finally(res => { this.pop(); this.work(); }) } } limit = 0; queue = []; lists = []; }
二、进阶功能拓展
有时候,我们希望在使用过程中还能往任务队列里添加新的任务。比如,在已经创建了Queue
实例q
后,还能通过q.push(() => delay('6', 3000))
这样的方式添加新任务。这就需要对之前的代码进行升级。
(一)新增push方法
我们新增一个外部可以调用的push
方法。这个方法可以接收一个任务或者一个任务数组。如果传入的是数组,就把数组里的任务都添加到lists
数组里;如果传入的是单个任务,就直接把这个任务添加到lists
数组里。添加完任务后,调用pushQueue
方法来处理是否把新任务添加到任务队列queue
里。
push(item) { if (item instanceof Array) { this.lists.push(...item); } else { this.lists.push(item) } this.pushQueue() }
(二)pushQueue方法的定义
pushQueue
方法的作用是检查当前是否有条件把lists
数组里的任务添加到任务队列queue
中。如果lists
数组里有任务,并且当前正在执行的异步任务数量小于并发限制limit
,就把lists
里的任务添加到queue
里。这里新增了一个running
变量,用来记录当前正在执行的异步任务数量。
pushQueue() { if (this.lists.length > 0) { for (let i = 0; i < this.limit - this.running; i++) { const item = this.lists.shift() if (item) this.queue.push(item); } } }
(三)修改work函数
因为新增了running
变量,所以在work
函数里,当开始执行一个异步任务时,要把running
加1;当异步任务完成时,要把running
减1。同时,任务完成后不再直接调用pop
方法,而是调用pushQueue
方法来处理任务队列。
work() { while(this.queue.length > 0) { const requestFunc = this.queue.shift(); const request = requestFunc(); // 开始执行异步任务,running加1 this.running += 1; Promise.resolve(request).finally(res => { // 异步任务完成,running减1 this.running -= 1; // 通过pushQueue处理任务队列 this.pushQueue(); this.work(); }) } }
升级后的完整代码如下:
class Queue { constructor(initArr, limit) { this.queue = []; this.limit = limit; this.lists = initArr; this.pushQueue(); this.work(); } pushQueue() { if (this.lists.length > 0) { for (let i = 0; i < this.limit - this.running; i++) { const item = this.lists.shift() if (item) this.queue.push(item); } } } push(item) { if (item instanceof Array) { this.lists.push(...item); } else { this.lists.push(item) } this.pushQueue() } work() { while(this.queue.length > 0) { const requestFunc = this.queue.shift(); const request = requestFunc(); this.running += 1; Promise.resolve(request).finally(res => { this.running -= 1; this.pushQueue(); this.work(); }) } } limit = 0; queue = []; lists = []; running = 0 }
通过以上一步步的实现,无论是基础的Promise并发控制,还是进阶的动态添加任务功能,都能顺利完成啦。希望这篇文章能帮助大家更好地理解和掌握Promise并发控制的实现原理,在面试和实际开发中都能轻松应对相关问题。