diff --git a/package-lock.json b/package-lock.json index 9a332e5..65b0804 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1200,10 +1200,6 @@ "sax": "^1.2.4" } }, - "node-common": { - "version": "git+ssh://git@thebword.ddns.net:3122/watsonb8/node-common.git#3ee1400be94851335e822916861ea2eddb9e344f", - "from": "git+ssh://git@thebword.ddns.net:3122/watsonb8/node-common.git" - }, "node-fetch": { "version": "2.6.1", "resolved": "https://registry.npmjs.org/node-fetch/-/node-fetch-2.6.1.tgz", @@ -1533,24 +1529,6 @@ "glob": "^7.1.3" } }, - "rtsp-stream": { - "version": "file:../rtsp-stream", - "requires": { - "child_process": "^1.0.2" - }, - "dependencies": { - "@types/node": { - "version": "14.14.6", - "resolved": "https://registry.npmjs.org/@types/node/-/node-14.14.6.tgz", - "integrity": "sha512-6QlRuqsQ/Ox/aJEQWBEJG7A9+u7oSYl3mem/K8IzxXG/kAGbV1YPD9Bg9Zw3vyxC/YP+zONKwy8hGkSt1jxFMw==" - }, - "child_process": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/child_process/-/child_process-1.0.2.tgz", - "integrity": "sha1-sffn/HPSXn/R1FWtyU4UODAYK1o=" - } - } - }, "safe-buffer": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", diff --git a/package.json b/package.json index 78c654a..0bf7424 100644 --- a/package.json +++ b/package.json @@ -31,8 +31,6 @@ "canvas": "^2.6.1", "dotenv-extended": "^2.9.0", "mime-types": "^2.1.27", - "rtsp-stream": "file:../rtsp-stream", - "node-common": "git+ssh://git@thebword.ddns.net:3122/watsonb8/node-common.git", "tsyringe": "^4.4.0" }, "devDependencies": { diff --git a/src/events/event.ts b/src/events/event.ts new file mode 100644 index 0000000..2790a48 --- /dev/null +++ b/src/events/event.ts @@ -0,0 +1,12 @@ +import { EventDelegate } from "./eventDelegate"; + +export class Event extends Array> { + constructor() { + super(); + } + public fire = (source: T, args: K) => { + for (const delegate of this) { + delegate(source, args); + } + }; +} diff --git a/src/events/eventDelegate.ts b/src/events/eventDelegate.ts new file mode 100644 index 0000000..5f14e97 --- /dev/null +++ b/src/events/eventDelegate.ts @@ -0,0 +1 @@ +export type EventDelegate = (sender: T, args: K) => void; diff --git a/src/events/index.ts b/src/events/index.ts new file mode 100644 index 0000000..0adc040 --- /dev/null +++ b/src/events/index.ts @@ -0,0 +1,2 @@ +export * from "./event"; +export * from "./eventDelegate"; diff --git a/src/monitor.ts b/src/monitor.ts index 24ebabd..1d751ff 100644 --- a/src/monitor.ts +++ b/src/monitor.ts @@ -1,12 +1,12 @@ import { FaceMatcher } from "@vladmandic/face-api"; import { IRoom } from "./config"; -import { Rtsp } from "rtsp-stream/lib"; +import { Rtsp, IStreamEventArgs, ICloseEventArgs } from "./rtsp"; import canvas from "canvas"; import * as faceapi from "@vladmandic/face-api"; import { getFaceDetectorOptions, saveFile } from "./common"; import { nets } from "@vladmandic/face-api"; import { Logger } from "homebridge"; -import { Event } from "common/events"; +import { Event } from "./events"; const { Canvas, Image, ImageData } = canvas; export type MonitorState = { [label: string]: string | null }; @@ -34,15 +34,17 @@ export class Monitor { for (const room of this._rooms) { this._streamsByRoom[room.name] = [ ...room.rtspConnectionStrings.map((connectionString) => { - return new Rtsp(connectionString, { + const rtsp = new Rtsp(connectionString, { rate: 1, image: true, - }) - .on("data", (data: Buffer) => this.onData(room.name, data)) - .on("error", (error: string) => - this.onError(error, connectionString) - ) - .on("exit", () => this.onExit(connectionString)); + }); + rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) => + this.onData(room.name, args) + ); + rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) => + this.onExit(connectionString, args) + ); + return rtsp; }), ]; @@ -107,8 +109,8 @@ export class Monitor { } } - private onData = async (room: string, data: Buffer) => { - const input = ((await canvas.loadImage(data)) as unknown) as ImageData; + private onData = async (room: string, args: IStreamEventArgs) => { + const input = ((await canvas.loadImage(args.data)) as unknown) as ImageData; const out = faceapi.createCanvasFromMedia(input); const resultsQuery = await faceapi .detectAllFaces(out, getFaceDetectorOptions(this._faceDetectionNet)) @@ -120,7 +122,7 @@ export class Monitor { saveFile( "/Users/brandonwatson/Documents/Git/Gitea/homebridge-face-location/out", "Kitchen.jpg", - data + args.data ); break; } @@ -128,7 +130,7 @@ export class Monitor { saveFile( "/Users/brandonwatson/Documents/Git/Gitea/homebridge-face-location/out", "LivingRoom.jpg", - data + args.data ); break; } @@ -152,7 +154,7 @@ export class Monitor { private onError = (error: string, streamName: string) => { this._logger.info(`[${streamName}] ${error}`); }; - private onExit = (streamName: string) => { - this._logger.info(`[${streamName}] Stream has exited`); + private onExit = (streamName: string, args: ICloseEventArgs) => { + this._logger.info(`[${streamName}] Stream has exited: ${args.message}`); }; } diff --git a/src/rtsp/index.ts b/src/rtsp/index.ts new file mode 100644 index 0000000..b547a88 --- /dev/null +++ b/src/rtsp/index.ts @@ -0,0 +1,146 @@ +import { ChildProcess, spawn } from "child_process"; +import { Writable } from "stream"; +import { IOptions } from "./options"; +import { Event } from "../events"; + +const ef1 = "ff"; +const ef2 = "d9"; + +export interface IStreamEventArgs { + data: Buffer; +} + +export interface ICloseEventArgs { + message: string; +} +export interface IErrorEventArgs { + message?: string; + err?: Error; +} + +export class Rtsp { + private _connecteionString: string; + private _childProcess: ChildProcess | undefined; + private _started: boolean; + private _buffer: Buffer; + private _options: IOptions; + private _paused: boolean; + + private _dataEvent: Event; + private _closeEvent: Event; + private _errorEvent: Event; + + constructor(connectionString: string, options: IOptions) { + this._started = false; + this._connecteionString = connectionString; + this._childProcess = undefined; + this._buffer = Buffer.from(""); + this._options = options; + this._paused = false; + this._dataEvent = new Event(); + this._closeEvent = new Event(); + this._errorEvent = new Event(); + + this.onData = this.onData.bind(this); + } + + public get isStarted(): boolean { + return this._started; + } + + public get isPaused(): boolean { + return this._paused; + } + + public get dataEvent(): Event { + return this._dataEvent; + } + + public get closeEvent(): Event { + return this._closeEvent; + } + + public get errorEvent(): Event { + return this._errorEvent; + } + + public start(): void { + const argStrings = [ + `-i ${this._connecteionString}`, + `-r ${this._options.rate ?? 10}`, + this._options.image + ? `-f image2` + : `-codec:v ${this._options.codec ?? "libx264"}`, + `-update 1 -`, + ]; + const args = argStrings.join(" "); + this._childProcess = spawn("ffmpeg", args.split(/\s+/)); + + if (!this._childProcess) { + return; + } + + this._childProcess.stdout?.on("data", this.onData); + this._childProcess.on("exit", (code: number, signal: NodeJS.Signals) => + this._closeEvent.fire(this, { + message: "FFmpeg exited with code: " + code + " and signal: " + signal, + }) + ); + this._childProcess.on("error", (error: Error) => + this._errorEvent.fire(this, { err: error }) + ); + + //Only register this event if there are subscribers + if (this._childProcess.stderr && this._errorEvent.length > 0) { + this._childProcess.stderr.on("data", this.onStdErrorData); + } + } + + public close(): void { + this._childProcess && this._childProcess.kill("SIGKILL"); + this._closeEvent.fire(this, { message: "Process killed by user" }); + } + + public pause(): void { + this._paused = true; + } + + public resume(): void { + this._paused = false; + } + + public getStdin(): Writable | null { + return this._childProcess ? this._childProcess.stdin : null; + } + + private onStdErrorData = (data: any): void => { + if (!this._started) { + this._started = true; + } + let msg = ""; + data + .toString() + .split(/\n/) + .forEach((line: string) => { + msg += `${line}\n`; + }); + + this._errorEvent.fire(this, { message: msg }); + }; + + private onData(data: Buffer): void { + if (!this._paused && data.length > 1) { + this._buffer = this._buffer + ? Buffer.concat([this._buffer, data]) + : (this._buffer = Buffer.from(data)); + //End of image + if ( + data[data.length - 2].toString(16) == ef1 && + data[data.length - 1].toString(16) == ef2 + ) { + this._dataEvent.fire(this, { data: this._buffer }); + this._buffer = Buffer.from(""); + } + } + } +} diff --git a/src/rtsp/options.ts b/src/rtsp/options.ts new file mode 100644 index 0000000..49090db --- /dev/null +++ b/src/rtsp/options.ts @@ -0,0 +1,7 @@ +export interface IOptions { + rate?: number; + quality?: number; + resolution?: string; + codec?: string; + image?: boolean; +} diff --git a/tsconfig.json b/tsconfig.json index e82e96a..54253fd 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -39,8 +39,8 @@ // "moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */ "baseUrl": "./" /* Base directory to resolve non-absolute module names. */, "paths": { - "rtsp/*": ["./node_modules/rtsp-stream/lib/*"], - "common/*": ["./node_modules/node-common/lib/*"] + "rtsp/*": ["node_modules/rtsp-stream/lib/*"], + "common/*": ["node_modules/node-common/lib/*"] } /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */, // "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */ // "typeRoots": [], /* List of folders to include type definitions from. */ @@ -62,13 +62,5 @@ "forceConsistentCasingInFileNames": true /* Disallow inconsistently-cased references to the same file. */ }, "include": ["./src"], - "exclude": ["node_modules"], - "references": [ - { - "path": "./node_modules/rtsp-stream/tsconfig.json" - }, - { - "path": "./node_modules/node-common/tsconfig.json" - } - ] + "exclude": ["node_modules"] }