From 824c0a1177e4e52d171888506862beac9f475088 Mon Sep 17 00:00:00 2001 From: Dreamacro <305009791@qq.com> Date: Fri, 26 Oct 2018 18:43:28 +0800 Subject: [PATCH] Add: StreamReader for stream api --- package-lock.json | 7 ++-- package.json | 1 + src/lib/streamer.ts | 91 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 4 deletions(-) create mode 100644 src/lib/streamer.ts diff --git a/package-lock.json b/package-lock.json index 32b9294..116b496 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3816,8 +3816,7 @@ "eventemitter3": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-3.1.0.tgz", - "integrity": "sha512-ivIvhpq/Y0uSjcHDcOIccjmYjGLcP09MFGE7ysAwkAvkXfpZlC985pH2/ui64DKazbTW/4kN3yqozUxlXzI6cA==", - "dev": true + "integrity": "sha512-ivIvhpq/Y0uSjcHDcOIccjmYjGLcP09MFGE7ysAwkAvkXfpZlC985pH2/ui64DKazbTW/4kN3yqozUxlXzI6cA==" }, "events": { "version": "1.1.1", @@ -5831,7 +5830,7 @@ }, "html-parse-stringify2": { "version": "2.0.1", - "resolved": "http://registry.npm.taobao.org/html-parse-stringify2/download/html-parse-stringify2-2.0.1.tgz", + "resolved": "https://registry.npmjs.org/html-parse-stringify2/-/html-parse-stringify2-2.0.1.tgz", "integrity": "sha1-3FZwtyksoVi3vJFsmmc1rIhyg0o=", "requires": { "void-elements": "^2.0.1" @@ -12974,7 +12973,7 @@ }, "void-elements": { "version": "2.0.1", - "resolved": "http://registry.npm.taobao.org/void-elements/download/void-elements-2.0.1.tgz", + "resolved": "https://registry.npmjs.org/void-elements/-/void-elements-2.0.1.tgz", "integrity": "sha1-wGavtYK7HLQSjWDqkjkulNXp2+w=" }, "ware": { diff --git a/package.json b/package.json index cf3ffbd..f5eb8aa 100644 --- a/package.json +++ b/package.json @@ -68,6 +68,7 @@ "axios": "^0.18.0", "classnames": "^2.2.6", "dayjs": "^1.7.7", + "eventemitter3": "^3.1.0", "i18next": "^11.10.0", "i18next-browser-languagedetector": "^2.2.3", "immer": "^1.7.2", diff --git a/src/lib/streamer.ts b/src/lib/streamer.ts new file mode 100644 index 0000000..def6bd0 --- /dev/null +++ b/src/lib/streamer.ts @@ -0,0 +1,91 @@ +import { to } from '@lib/helper' +import * as EventEmitter from 'eventemitter3' + +export interface Config { + url: string + headers?: { [key: string]: string } + bufferLength?: number + retryInterval?: number +} + +export class StreamReader { + protected EE = new EventEmitter() + protected config: Config + protected innerBuffer: T[] = [] + protected isClose = false + + constructor (config: Config) { + this.config = Object.assign( + { + bufferLength: 0, + retryInterval: 5000, + headers: {} + }, + config + ) + + this.loop() + } + + protected async loop () { + const [resp, err] = await to(fetch( + this.config.url, + { + mode: 'cors', + headers: this.config.headers + } + )) + if (err) { + this.retry(err) + return + } + + const reader = resp.body.getReader() + const decoder = new TextDecoder() + while (true) { + if (this.isClose) { + break + } + + const [{ value }, err] = await to(reader.read()) + if (err) { + this.retry(err) + break + } + + const lines = decoder.decode(value).trim().split('\n') + const data = lines.map(l => JSON.parse(l)) + this.EE.emit('data', data) + if (this.config.bufferLength > 0) { + this.innerBuffer.push(...data) + if (this.innerBuffer.length > this.config.bufferLength) { + this.innerBuffer.splice(0, this.innerBuffer.length - this.config.bufferLength) + } + } + } + } + + protected retry (err) { + if (!this.isClose) { + this.EE.emit('error', err) + window.setTimeout(this.loop, this.config.retryInterval) + } + } + + subscribe (event: string, callback: (data: T) => void) { + this.EE.addListener(event, callback) + } + + unsubscribe (event: string, callback: (data: T) => void) { + this.EE.removeListener(event, callback) + } + + buffer () { + return this.innerBuffer.slice() + } + + destory () { + this.EE.removeAllListeners() + this.isClose = true + } +}