跳到主要内容

Koa.js 原理

学习准备

Promise 使用

async/await 使用

Node.js原生http模块

中间件引擎

前言

在使用Koa.js过程中,会发现中间件的使用都是这样子的,如以下代码所示。

const Koa = require('koa');
let app = new Koa();

const middleware1 = async (ctx, next) => {
console.log(1);
await next();
console.log(6);
};

const middleware2 = async (ctx, next) => {
console.log(2);
await next();
console.log(5);
};

const middleware3 = async (ctx, next) => {
console.log(3);
await next();
console.log(4);
};

app.use(middleware1);
app.use(middleware2);
app.use(middleware3);
app.use(async (ctx, next) => {
ctx.body = 'hello world';
});

app.listen(3001);

// 启动访问浏览器
// 控制台会出现以下结果
// 1
// 2
// 3
// 4
// 5
// 6

为什么会出现以上的结果, 这个主要是Koa.js的一个中间件引擎 koa-compose模块来实现的,也就是Koa.js实现洋葱模型的核心引擎。

中间件原理

洋葱模型可以看出,中间件的在 await next() 前后的操作,很像数据结构的一种场景——“栈”,先进后出。同时,又有统一上下文管理操作数据。综上所述,可以总结出一下特性。

  • 有统一 context
  • 操作先进后出
  • 有控制先进后出的机制 next
  • 有提前结束机制

这样子我们可以单纯用 Promise 做个简单的实现如下:

let context = {
data: []
};

async function middleware1(ctx, next) {
console.log('action 001');
ctx.data.push(1);
await next();
console.log('action 006');
ctx.data.push(6);
}

async function middleware2(ctx, next) {
console.log('action 002');
ctx.data.push(2);
await next();
console.log('action 005');
ctx.data.push(5);
}

async function middleware3(ctx, next) {
console.log('action 003');
ctx.data.push(3);
await next();
console.log('action 004');
ctx.data.push(4);
}

Promise.resolve(
middleware1(context, async () => {
return Promise.resolve(
middleware2(context, async () => {
return Promise.resolve(
middleware3(context, async () => {
return Promise.resolve();
})
);
})
);
})
).then(() => {
console.log('end');
console.log('context = ', context);
});

// 结果显示
// "action 001"
// "action 002"
// "action 003"
// "action 004"
// "action 005"
// "action 006"
// "end"
// "context = { data: [1, 2, 3, 4, 5, 6]}"

引擎实现

通过上一节中的中间件原理,可以看出,单纯用Promise 嵌套可以直接实现中间件流程。虽然可以实现,但是Promise嵌套会产生代码的可读性和可维护性的问题,也带来了中间件扩展问题。

所以需要把Promise 嵌套实现的中间件方式进行高度抽象,达到可以自定义中间件的层数。这时候需要借助前面几章提到的处理 Promise嵌套的神器async/await

我们先理清楚需要的步骤

  • 中间件队列
  • 处理中间件队列,并将上下文context传进去
  • 中间件的流程控制器next
  • 异常处理

根据上一节分析中间的原理,我们可以抽象出

  • 每一个中间件需要封装一个 Promise
  • 洋葱模型的先进后出操作,对应Promise.resolve的前后操作
function compose(middleware) {
// 中间件数组
if (!Array.isArray(middleware)) {
throw new TypeError('Middleware stack must be an array!');
}

return function (ctx, next) {
let index = -1;

return dispatch(0);

function dispatch(i) {
if (i < index) {
return Promise.reject(new Error('next() called multiple times'));
}
index = i;

let fn = middleware[i];

// 执行到最后,将自身的回调赋值给 fn
if (i === middleware.length) {
fn = next;
}

// 数组的 lenth 项为 undefined,返回 Promise
if (!fn) {
return Promise.resolve();
}

try {
// 执行过程中,依次执行每一个中间件,并将上下文传递到下一个中间件
return Promise.resolve(
fn(ctx, () => {
return dispatch(i + 1);
})
);
} catch (err) {
return Promise.reject(err);
}
}
};
}

试用中间件引擎


let middleware = [];
let context = {
data: []
};

middleware.push(async (ctx, next) => {
console.log('action 001');
ctx.data.push(1);
await next();
console.log('action 006');
ctx.data.push(6);
});

middleware.push(async (ctx, next) => {
console.log('action 002');
ctx.data.push(2);
await next();
console.log('action 005');
ctx.data.push(5);
});

middleware.push(async (ctx, next) => {
console.log('action 003');
ctx.data.push(3);
await next();
console.log('action 004');
ctx.data.push(4);
});

const fn = compose(middleware);

fn(context).then(() => {
console.log('end');
console.log('context = ', context);
});

// 结果显示
// "action 001"
// "action 002"
// "action 003"
// "action 004"
// "action 005"
// "action 006"
// "end"
// "context = { data: [1, 2, 3, 4, 5, 6]}"

普通中间件式HTTP服务实现

前言

用过Express.jsKoa.js的人会发现使用方式很类似,也是基于中间件的理念去实现Web服务。

直接以Express.js回调式的中间件服务比较容易理解。再基于回调式的中间件服务接入Koa.js的中间件引擎去处理回调嵌套的处理。

这一章主要以原生的Node.js实现纯回调的中间件HTTP服务。

必要条件

  • 内置中间件队列
  • 中间件遍历机制
  • 异常处理机制

最简实现

  • demo源码

https://github.com/chenshenhai/koajs-design-note/tree/master/demo/chapter-01-06

  • 服务类封装
const http = require('http');
const Emitter = require('events');

class WebServer extends Emitter {

constructor() {
super();
this.middleware = [];
this.context = Object.create({});
}

/**
* 服务事件监听
* @param {*} args
*/
listen(...args) {
const server = http.createServer(this.callback());
return server.listen(...args);
}

/**
* 注册使用中间件
* @param {Function} fn
*/
use(fn) {
if (typeof fn === 'function') {
this.middleware.push(fn);
}
}

/**
* 中间件总回调方法
*/
callback() {
let that = this;

if (this.listeners('error').length === 0) {
this.on('error', this.onerror);
}

const handleRequest = (req, res) => {
let context = that.createContext(req, res);
this.middleware.forEach((cb, idx) => {
try {
cb(context);
} catch (err) {
that.onerror(err);
}

if (idx + 1 >= this.middleware.length) {
if (res && typeof res.end === 'function') {
res.end();
}
}
});
};
return handleRequest;
}

/**
* 异常处理监听
* @param {EndOfStreamError} err
*/
onerror(err) {
console.log(err);
}

/**
* 创建通用上下文
* @param {Object} req
* @param {Object} res
*/
createContext(req, res) {
let context = Object.create(this.context);
context.req = req;
context.res = res;
return context;
}
}

module.exports = WebServer;
  • 服务使用
const WebServer = require('./index');

const app = new WebServer();
const PORT = 3001;

app.use(ctx => {
ctx.res.write('<p>line 1</p>');
});

app.use(ctx => {
ctx.res.write('<p>line 2</p>');
});

app.use(ctx => {
ctx.res.write('<p>line 3</p>');
});

app.listen(PORT, () => {
console.log(`the web server is starting at port ${PORT}`);
});

最简Koa.js实现

前言

从上一章可以看到最简单的中间件式HTTP服务的实现,底层是基于回调嵌套去处理中间件队列。

/**
* 中间件总回调方法
*/
callback() {
let that = this;

if (this.listeners('error').length === 0) {
this.on('error', this.onerror);
}

const handleRequest = (req, res) => {
let context = that.createContext(req, res);
this.middleware.forEach((cb, idx) => {
try {
cb(context);
} catch (err) {
that.onerror(err);
}

if (idx + 1 >= this.middleware.length) {
if (res && typeof res.end === 'function') {
res.end();
}
}
});
};
return handleRequest;
}

但是中间件越多,回调嵌套越深,代码的可读性和可扩展性就很差,所以这时候把回调嵌套转化成 Promise + async/await ,这个时候就转变成最简单的Koa.js实现。

必要条件

  • 通过上下文赋值可代替 res.end()
  • 洋葱模型的中间件机制

源码实现

  • demo源码

https://github.com/chenshenhai/koajs-design-note/tree/master/demo/chapter-01-07

// compose.js
module.exports = compose;

function compose(middleware) {
if (!Array.isArray(middleware)) {
throw new TypeError('Middleware stack must be an array!');
}

return function (ctx, next) {
let index = -1;

return dispatch(0);

function dispatch(i) {
if (i < index) {
return Promise.reject(new Error('next() called multiple times'));
}
index = i;

let fn = middleware[i];

if (i === middleware.length) {
fn = next;
}

if (!fn) {
return Promise.resolve();
}

try {
return Promise.resolve(
fn(ctx, () => {
return dispatch(i + 1);
})
);
} catch (err) {
return Promise.reject(err);
}
}
};
}
// index.js
const http = require('http');
const Emitter = require('events');
// 注意:这里的compose是前几章的中间件引擎源码
const compose = require('./../compose');

/**
* 通用上下文
*/
const context = {
_body: null,

get body() {
return this._body;
},

set body(val) {
this._body = val;
this.res.end(this._body);
}
};

class SimpleKoa extends Emitter {
constructor() {
super();
this.middleware = [];
this.context = Object.create(context);
}

/**
* 服务事件监听
* @param {*} args
*/
listen(...args) {
const server = http.createServer(this.callback());
return server.listen(...args);
}

/**
* 注册使用中间件
* @param {Function} fn
*/
use(fn) {
if (typeof fn === 'function') {
this.middleware.push(fn);
}
}

/**
* 中间件总回调方法
*/
callback() {
if (this.listeners('error').length === 0) {
this.on('error', this.onerror);
}

const handleRequest = (req, res) => {
let context = this.createContext(req, res);
let middleware = this.middleware;
// 执行中间件
compose(middleware)(context).catch(err => this.onerror(err));
};
return handleRequest;
}

/**
* 异常处理监听
* @param {EndOfStreamError} err
*/
onerror(err) {
console.log(err);
}

/**
* 创建通用上下文
* @param {Object} req
* @param {Object} res
*/
createContext(req, res) {
let context = Object.create(this.context);
context.req = req;
context.res = res;
return context;
}
}

module.exports = SimpleKoa;

  • 执行例子
const SimpleKoa = require('./index');

const app = new SimpleKoa();
const PORT = 3001;

app.use(async ctx => {
ctx.body = '<p>this is a body</p>';
});

app.listen(PORT, () => {
console.log(`the web server is starting at port ${PORT}`);
});