...
 
Commits (1)
const kOnMessage = Symbol('onMessage')
const kChildren = Symbol('children')
class FIFO {
map = new Map()
list = new Array()
occupiedLength = 0
constructor ({maxLength}) {
this.maxLength = maxLength
}
set (key, value, length) {
if (this.map.had(key)) return
if (length > this.maxLength) return
while (length > this.maxLength - this.occupiedLength) {
const head = this.list.shift()
this.map.delete(head.key)
this.occupiedLength -= head.length
}
this.list.push({key, value, length})
this.map.set(key, value)
this.occupiedLength += length
}
get (key) {
if (this.map.has(key)) {
return this.map.get(key).value
}
}
}
export class ClusterSharedCacheServer {
caches = new Map()
[kChildren] = new Map()
constructor () {
}
createCache (namespace, type, options) {
if (this.caches.has(namespace)) {
throw new Error('Namespace conflict')
}
switch (type) {
case 'fifo':
this.caches.set(namespace, new FIFO(options))
break
default:
}
}
listen (children) {
for (const child of children) {
if (this[kChildren].has(child)) continue
const listener = ({
clusterSharedCacheSet,
clusterSharedCacheGetRequest
}) => {
if (clusterSharedCacheSet) {
const {namespace, key, value, length} = clusterSharedCacheSet
if (this.caches.has(namespace)) {
this.caches.get(namespace).set(key, value, length)
}
} else if (clusterSharedCacheGetRequest) {
const {namespace, key} = clusterSharedCacheGetRequest
if (this.caches.has(namespace)) {
const value = this.caches.get(namespace).get(key)
child.send({
clusterSharedCacheGetResponse: {
namespace,
key,
value
}
})
}
}
}
child.on('message', listener)
}
}
mute (children) {
for (const child of children) {
if (this[kChildren].has(child)) {
const listener = this[kChildren].get(child)
child.removeListener('message', listener)
this[kChildren].delete(child)
}
}
}
}
export class ClusterSharedCacheClient {
pending = new Map()
constructor (namespace, options) {
this.namespace = namespace
this.options = options
}
connect () {
if (!this[kOnMessage]) {
const listener = ({clusterSharedCacheGetResponse}) => {
if (clusterSharedCacheGetResponse) {
const {key, value, namespace} = clusterSharedCacheGetResponse
if (namespace === this.namespace && this.pending.has(key)) {
const callbacks = this.pending.get(key)
this.pending.delete(key)
for (const callback of callbacks) {
process.nextTick(callback, value)
}
}
}
}
this[kOnMessage] = listener
process.on('message', listener)
}
}
disconnect () {
const listener = this[kOnMessage]
process.removeListener('message', listener)
}
get (key) {
return new Promise((resolve) => {
if (this.pending.has(key)) {
this.pending.get(key).push(resolve)
} else {
this.pending.set(key, [resolve])
process.send({
clusterSharedCacheGetRequest: {
namespace: this.namespace,
key
}
})
}
})
}
set (key, value) {
const length = this.lengthOf(value, key)
process.send({
clusterSharedCacheSet: {
namespace: this.namespace,
key,
value,
length
}
})
}
}
...@@ -3,6 +3,7 @@ import physicalCpuCount from 'physical-cpu-count' ...@@ -3,6 +3,7 @@ import physicalCpuCount from 'physical-cpu-count'
import eventToPromise from 'event-to-promise' import eventToPromise from 'event-to-promise'
import {watch} from 'chokidar' import {watch} from 'chokidar'
import debounce from 'debounce-collect' import debounce from 'debounce-collect'
import {ClusterSharedCacheServer} from './helpers/ClusterSharedCache'
export async function master (argv) { export async function master (argv) {
const workers = [] const workers = []
...@@ -12,6 +13,14 @@ export async function master (argv) { ...@@ -12,6 +13,14 @@ export async function master (argv) {
workers.forEach((worker) => worker.send({argv})) workers.forEach((worker) => worker.send({argv}))
await Promise.all(workers.map((worker) => eventToPromise(worker, 'listening'))) await Promise.all(workers.map((worker) => eventToPromise(worker, 'listening')))
const clusterSharedCacheServer = new ClusterSharedCacheServer()
clusterSharedCacheServer.createCache(
'tlsSessions',
'fifo',
{maxLength: Math.min(1e6, freemem() / 100)}
)
clusterSharedCacheServer.listen(workers)
const broadcastExpireToWorkers = debounce(function (paths) { const broadcastExpireToWorkers = debounce(function (paths) {
for (const path of new Set(paths)) { for (const path of new Set(paths)) {
for (const worker of workers) { for (const worker of workers) {
......
...@@ -2,6 +2,7 @@ import 'babel-polyfill' ...@@ -2,6 +2,7 @@ import 'babel-polyfill'
import {readFileSync} from 'fs' import {readFileSync} from 'fs'
import {app} from './app' import {app} from './app'
import ocsp from 'ocsp' import ocsp from 'ocsp'
import ClusterSharedCache from './helpers/ClusterSharedCache'
const http2 = require('http2') const http2 = require('http2')
...@@ -13,19 +14,31 @@ export function server (options) { ...@@ -13,19 +14,31 @@ export function server (options) {
ca: options.ca.map((file) => readFileSync(file)) ca: options.ca.map((file) => readFileSync(file))
}, requestListener) }, requestListener)
const cache = new ocsp.Cache() const ocspCache = new ocsp.Cache()
server.on('OCSPRequest', function (cert, issuer, cb) { server.on('OCSPRequest', (certificate, issuer, callback) => {
if (!issuer) return cb() if (!issuer) return callback()
ocsp.getOCSPURI(cert, function (err, uri) { ocsp.getOCSPURI(certificate, (error, uri) => {
if (err) return cb(err) if (error) return callback(error)
var req = ocsp.request.generate(cert, issuer) var request = ocsp.request.generate(certificate, issuer)
var options = { var options = {
url: uri, url: uri,
ocsp: req.data ocsp: request.data
} }
cache.request(req.id, options, cb) ocspCache.request(request.id, options, callback)
}) })
}) })
const sessionCache = new ClusterSharedCacheClient(
'tlsSessions',
{lengthOf: ({byteLength}) => byteLength}
)
server.on('newSession', (sessionId, sessionData, callback) => {
sessionCache.set(sessionId, sessionData)
})
server.on('resumeSession', async (sessionId, callback) => {
const cached = await sessionCache.get(sessionId)
callback(cached || null)
})
return server return server
} }