Add: StreamReader for stream api

This commit is contained in:
Dreamacro 2018-10-26 18:43:28 +08:00
parent 50f6bbdbb6
commit 824c0a1177
3 changed files with 95 additions and 4 deletions

7
package-lock.json generated
View File

@ -3816,8 +3816,7 @@
"eventemitter3": { "eventemitter3": {
"version": "3.1.0", "version": "3.1.0",
"resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-3.1.0.tgz", "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-3.1.0.tgz",
"integrity": "sha512-ivIvhpq/Y0uSjcHDcOIccjmYjGLcP09MFGE7ysAwkAvkXfpZlC985pH2/ui64DKazbTW/4kN3yqozUxlXzI6cA==", "integrity": "sha512-ivIvhpq/Y0uSjcHDcOIccjmYjGLcP09MFGE7ysAwkAvkXfpZlC985pH2/ui64DKazbTW/4kN3yqozUxlXzI6cA=="
"dev": true
}, },
"events": { "events": {
"version": "1.1.1", "version": "1.1.1",
@ -5831,7 +5830,7 @@
}, },
"html-parse-stringify2": { "html-parse-stringify2": {
"version": "2.0.1", "version": "2.0.1",
"resolved": "http://registry.npm.taobao.org/html-parse-stringify2/download/html-parse-stringify2-2.0.1.tgz", "resolved": "https://registry.npmjs.org/html-parse-stringify2/-/html-parse-stringify2-2.0.1.tgz",
"integrity": "sha1-3FZwtyksoVi3vJFsmmc1rIhyg0o=", "integrity": "sha1-3FZwtyksoVi3vJFsmmc1rIhyg0o=",
"requires": { "requires": {
"void-elements": "^2.0.1" "void-elements": "^2.0.1"
@ -12974,7 +12973,7 @@
}, },
"void-elements": { "void-elements": {
"version": "2.0.1", "version": "2.0.1",
"resolved": "http://registry.npm.taobao.org/void-elements/download/void-elements-2.0.1.tgz", "resolved": "https://registry.npmjs.org/void-elements/-/void-elements-2.0.1.tgz",
"integrity": "sha1-wGavtYK7HLQSjWDqkjkulNXp2+w=" "integrity": "sha1-wGavtYK7HLQSjWDqkjkulNXp2+w="
}, },
"ware": { "ware": {

View File

@ -68,6 +68,7 @@
"axios": "^0.18.0", "axios": "^0.18.0",
"classnames": "^2.2.6", "classnames": "^2.2.6",
"dayjs": "^1.7.7", "dayjs": "^1.7.7",
"eventemitter3": "^3.1.0",
"i18next": "^11.10.0", "i18next": "^11.10.0",
"i18next-browser-languagedetector": "^2.2.3", "i18next-browser-languagedetector": "^2.2.3",
"immer": "^1.7.2", "immer": "^1.7.2",

91
src/lib/streamer.ts Normal file
View File

@ -0,0 +1,91 @@
import { to } from '@lib/helper'
import * as EventEmitter from 'eventemitter3'
export interface Config {
url: string
headers?: { [key: string]: string }
bufferLength?: number
retryInterval?: number
}
export class StreamReader<T> {
protected EE = new EventEmitter()
protected config: Config
protected innerBuffer: T[] = []
protected isClose = false
constructor (config: Config) {
this.config = Object.assign(
{
bufferLength: 0,
retryInterval: 5000,
headers: {}
},
config
)
this.loop()
}
protected async loop () {
const [resp, err] = await to(fetch(
this.config.url,
{
mode: 'cors',
headers: this.config.headers
}
))
if (err) {
this.retry(err)
return
}
const reader = resp.body.getReader()
const decoder = new TextDecoder()
while (true) {
if (this.isClose) {
break
}
const [{ value }, err] = await to(reader.read())
if (err) {
this.retry(err)
break
}
const lines = decoder.decode(value).trim().split('\n')
const data = lines.map(l => JSON.parse(l))
this.EE.emit('data', data)
if (this.config.bufferLength > 0) {
this.innerBuffer.push(...data)
if (this.innerBuffer.length > this.config.bufferLength) {
this.innerBuffer.splice(0, this.innerBuffer.length - this.config.bufferLength)
}
}
}
}
protected retry (err) {
if (!this.isClose) {
this.EE.emit('error', err)
window.setTimeout(this.loop, this.config.retryInterval)
}
}
subscribe<T> (event: string, callback: (data: T) => void) {
this.EE.addListener(event, callback)
}
unsubscribe<T> (event: string, callback: (data: T) => void) {
this.EE.removeListener(event, callback)
}
buffer () {
return this.innerBuffer.slice()
}
destory () {
this.EE.removeAllListeners()
this.isClose = true
}
}