javascript 请求 sse stream,解析结果

2023-05-28 20:03:08 浏览数 (1)

javascript接收sse打印结果。javascript接收sse打印结果。

比如 目前 openai api 的 stream 返回。 标准的请求sse是 EventSource,但是这个无法像正常post一样,携带数据或者header。若你的接口需要进行鉴权,需要携带header或者body数据,像post请求一样,那么这个EventSource就没法用了。

查了一下,目前并没有直接的支持方案,一些fetch库,比如axios,也是无法支持的。

这里有一种方法,可以实现这种效果,给大家分享一下:

代码语言:javascript复制

      var SSE = function (url, options) {
        if (!(this instanceof SSE)) {
          return new SSE(url, options);
        }

        this.INITIALIZING = -1;
        this.CONNECTING = 0;
        this.OPEN = 1;
        this.CLOSED = 2;

        this.url = url;

        options = options || {};
        this.headers = options.headers || {};
        this.payload = options.payload !== undefined ? options.payload : '';
        this.method = options.method || (this.payload && 'POST') || 'GET';
        this.withCredentials = !!options.withCredentials;

        this.FIELD_SEPARATOR = ':';
        this.listeners = {};

        this.xhr = null;
        this.readyState = this.INITIALIZING;
        this.progress = 0;
        this.chunk = '';

        this.addEventListener = function (type, listener) {
          if (this.listeners[type] === undefined) {
            this.listeners[type] = [];
          }

          if (this.listeners[type].indexOf(listener) === -1) {
            this.listeners[type].push(listener);
          }
        };

        this.removeEventListener = function (type, listener) {
          if (this.listeners[type] === undefined) {
            return;
          }

          var filtered = [];
          this.listeners[type].forEach(function (element) {
            if (element !== listener) {
              filtered.push(element);
            }
          });
          if (filtered.length === 0) {
            delete this.listeners[type];
          } else {
            this.listeners[type] = filtered;
          }
        };

        this.dispatchEvent = function (e) {
          if (!e) {
            return true;
          }

          e.source = this;

          var onHandler = 'on'   e.type;
          if (this.hasOwnProperty(onHandler)) {
            this[onHandler].call(this, e);
            if (e.defaultPrevented) {
              return false;
            }
          }

          if (this.listeners[e.type]) {
            return this.listeners[e.type].every(function (callback) {
              callback(e);
              return !e.defaultPrevented;
            });
          }

          return true;
        };

        this._setReadyState = function (state) {
          var event = new CustomEvent('readystatechange');
          event.readyState = state;
          this.readyState = state;
          this.dispatchEvent(event);
        };

        this._onStreamFailure = function (e) {
          var event = new CustomEvent('error');
          event.data = e.currentTarget.response;
          this.dispatchEvent(event);
          this.close();
        };

        this._onStreamAbort = function (e) {
          this.dispatchEvent(new CustomEvent('abort'));
          this.close();
        };

        this._onStreamProgress = function (e) {
          if (!this.xhr) {
            return;
          }

          if (this.xhr.status !== 200) {
            this._onStreamFailure(e);
            return;
          }

          if (this.readyState == this.CONNECTING) {
            this.dispatchEvent(new CustomEvent('open'));
            this._setReadyState(this.OPEN);
          }

          var data = this.xhr.responseText.substring(this.progress);
          this.progress  = data.length;
          data.split(/(rn|r|n){2}/g).forEach(
            function (part) {
              if (part.trim().length === 0) {
                this.dispatchEvent(this._parseEventChunk(this.chunk.trim()));
                this.chunk = '';
              } else {
                this.chunk  = part;
              }
            }.bind(this),
          );
        };

        this._onStreamLoaded = function (e) {
          this._onStreamProgress(e);

          // Parse the last chunk.
          this.dispatchEvent(this._parseEventChunk(this.chunk));
          this.chunk = '';
        };

        /**
         * Parse a received SSE event chunk into a constructed event object.
         */
        this._parseEventChunk = function (chunk) {
          if (!chunk || chunk.length === 0) {
            return null;
          }

          var e = { id: null, retry: null, data: '', event: 'message' };
          chunk.split(/n|rn|r/).forEach(
            function (line) {
              line = line.trimRight();
              var index = line.indexOf(this.FIELD_SEPARATOR);
              if (index <= 0) {
                // Line was either empty, or started with a separator and is a comment.
                // Either way, ignore.
                return;
              }

              var field = line.substring(0, index);
              if (!(field in e)) {
                return;
              }

              var value = line.substring(index   1).trimLeft();
              if (field === 'data') {
                e[field]  = value;
              } else {
                e[field] = value;
              }
            }.bind(this),
          );

          var event = new CustomEvent(e.event);
          event.data = e.data;
          event.id = e.id;
          return event;
        };

        this._checkStreamClosed = function () {
          if (!this.xhr) {
            return;
          }

          if (this.xhr.readyState === XMLHttpRequest.DONE) {
            this._setReadyState(this.CLOSED);
          }
        };

        this.stream = function () {
          this._setReadyState(this.CONNECTING);

          this.xhr = new XMLHttpRequest();
          this.xhr.addEventListener('progress', this._onStreamProgress.bind(this));
          this.xhr.addEventListener('load', this._onStreamLoaded.bind(this));
          this.xhr.addEventListener('readystatechange', this._checkStreamClosed.bind(this));
          this.xhr.addEventListener('error', this._onStreamFailure.bind(this));
          this.xhr.addEventListener('abort', this._onStreamAbort.bind(this));
          this.xhr.open(this.method, this.url);
          for (var header in this.headers) {
            this.xhr.setRequestHeader(header, this.headers[header]);
          }
          this.xhr.withCredentials = this.withCredentials;
          this.xhr.send(this.payload);
        };

        this.close = function () {
          if (this.readyState === this.CLOSED) {
            return;
          }

          this.xhr.abort();
          this.xhr = null;
          this._setReadyState(this.CLOSED);
        };
      };
    

可以像这样使用:

代码语言:javascript复制
        const url = 'http://localhost:8000/xxx/xxxx?a=0';
        var source = new SSE(url, {
          method: 'POST',
          // withCredentials: true,
          headers: {
            'Content-Type': 'application/json',
            'Cache-Control': 'no-cache',
            'self-header1': 'xxxx',
            'self-header2': 'xxxx',
          },
          payload: JSON.stringify({   // POST body
            a: 'a',
            b: 'b',
          }),
        });
        source.onmessage = function (data) {
          console.log('111111 onmessage', data);  // data: 后面的信息
        };
        source.onerror = function (err) {
          console.log('111111 onerror', err);     // data: 后面的信息
        };
        source.stream();

0 人点赞