Using common events

This commit is contained in:
watsonb8
2020-12-10 14:44:13 -05:00
parent e35c0f5a64
commit 7dc9758147
4 changed files with 118 additions and 78 deletions

View File

@ -2,11 +2,24 @@ import { ChildProcess, spawn } from "child_process";
import { EventEmitter } from "events";
import { Writable } from "stream";
import { IOptions } from "./options";
import { Event } from "common/events";
const ef1 = "ff";
const ef2 = "d9";
export class Rtsp extends EventEmitter {
export interface IStreamEventArgs {
data: Buffer;
}
export interface ICloseEventArgs {
message: string;
}
export interface IErrorEventArgs {
message?: string;
err?: Error;
}
export class Rtsp {
private _connecteionString: string;
private _childProcess: ChildProcess | undefined;
private _started: boolean;
@ -14,14 +27,20 @@ export class Rtsp extends EventEmitter {
private _options: IOptions;
private _paused: boolean;
private _dataEvent: Event<this, IStreamEventArgs>;
private _closeEvent: Event<this, ICloseEventArgs>;
private _errorEvent: Event<this, IErrorEventArgs>;
constructor(connectionString: string, options: IOptions) {
super();
this._started = false;
this._connecteionString = connectionString;
this._childProcess = undefined;
this._buffer = Buffer.from("");
this._options = options;
this._paused = false;
this._dataEvent = new Event();
this._closeEvent = new Event();
this._errorEvent = new Event();
this.onData = this.onData.bind(this);
}
@ -34,6 +53,18 @@ export class Rtsp extends EventEmitter {
return this._paused;
}
public get dataEvent(): Event<this, IStreamEventArgs> {
return this._dataEvent;
}
public get closeEvent(): Event<this, ICloseEventArgs> {
return this._closeEvent;
}
public get errorEvent(): Event<this, IErrorEventArgs> {
return this._errorEvent;
}
public start(): void {
const argStrings = [
`-i ${this._connecteionString}`,
@ -51,17 +82,24 @@ export class Rtsp extends EventEmitter {
}
this._childProcess.stdout?.on("data", this.onData);
this._childProcess.on("exit", this.onExit);
this._childProcess.on("error", this.onError);
this._childProcess.on("exit", (code: number, signal: NodeJS.Signals) =>
this._closeEvent.fire(this, {
message: "FFmpeg exited with code: " + code + " and signal: " + signal,
})
);
this._childProcess.on("error", (error: Error) =>
this._errorEvent.fire(this, { err: error })
);
if (this._childProcess.stderr) {
// this._childProcess.stderr.on("data", this.onStdErrorData);
//Only register this event if there are subscribers
if (this._childProcess.stderr && this._errorEvent.length > 0) {
this._childProcess.stderr.on("data", this.onStdErrorData);
}
}
public close(): void {
this._childProcess && this._childProcess.kill("SIGKILL");
this.emit("closed");
this._closeEvent.fire(this, { message: "Process killed by user" });
}
public pause(): void {
@ -76,10 +114,6 @@ export class Rtsp extends EventEmitter {
return this._childProcess ? this._childProcess.stdin : null;
}
private onError = (err: Error): void => {
this.emit("error", new Error("Failed to start stream: " + err.message));
};
private onStdErrorData = (data: any): void => {
if (!this._started) {
this._started = true;
@ -92,13 +126,7 @@ export class Rtsp extends EventEmitter {
msg += `${line}\n`;
});
this.emit("error", msg);
};
private onExit = (code: number, signal: NodeJS.Signals): void => {
const message =
"FFmpeg exited with code: " + code + " and signal: " + signal;
this.emit("closed", message);
this._errorEvent.fire(this, { message: msg });
};
private onData(data: Buffer): void {
@ -111,7 +139,7 @@ export class Rtsp extends EventEmitter {
data[data.length - 2].toString(16) == ef1 &&
data[data.length - 1].toString(16) == ef2
) {
this.emit("data", this._buffer);
this._dataEvent.fire(this, { data: this._buffer });
this._buffer = Buffer.from("");
}
}