From 5cec734a097e0783cbe04fa4b1a761755fb259cc Mon Sep 17 00:00:00 2001 From: watsonb8 Date: Fri, 11 Dec 2020 00:02:10 -0500 Subject: [PATCH] Successful stream stress test Need to add stream watchdog timer and accessory timeout timer --- scripts/streamAndDetect.ts | 12 ++--- src/common.ts | 33 ++++++++++--- src/config.ts | 6 +++ src/{platform.ts => homeLocationPlatform.ts} | 2 +- src/index.ts | 2 +- src/locationAccessory.ts | 2 +- src/monitor.ts | 49 ++++++++++---------- src/rtsp/{index.ts => rtsp.ts} | 38 ++++++++++++--- 8 files changed, 96 insertions(+), 48 deletions(-) rename src/{platform.ts => homeLocationPlatform.ts} (99%) rename src/rtsp/{index.ts => rtsp.ts} (77%) diff --git a/scripts/streamAndDetect.ts b/scripts/streamAndDetect.ts index e8dedfb..8888a43 100644 --- a/scripts/streamAndDetect.ts +++ b/scripts/streamAndDetect.ts @@ -1,4 +1,4 @@ -import { Rtsp } from "rtsp-stream/lib"; +import { Rtsp, IStreamEventArgs } from "../src/rtsp/rtsp"; import { nets } from "@vladmandic/face-api"; import * as faceapi from "@vladmandic/face-api"; import canvas from "canvas"; @@ -37,10 +37,10 @@ const main = async () => { const content = JSON.parse(raw); const matcher = faceapi.FaceMatcher.fromJSON(content); - rtsp.on("data", async (data: Buffer) => { - const input = ((await canvas.loadImage(data)) as unknown) as ImageData; + rtsp.dataEvent.push(async (sender: Rtsp, args: IStreamEventArgs) => { + const input = ((await canvas.loadImage(args.data)) as unknown) as ImageData; const out = faceapi.createCanvasFromMedia(input); - await saveFile("image.jpg", data); + await saveFile(process.env.OUT_DIR as string, "image.jpg", args.data); const resultsQuery = await faceapi .detectAllFaces(out, getFaceDetectorOptions(faceDetectionNet)) .withFaceLandmarks() @@ -52,10 +52,6 @@ const main = async () => { } }); - rtsp.on("error", (err) => { - // console.log(err); - }); - rtsp.start(); }; diff --git a/src/common.ts b/src/common.ts index 803decb..60da41f 100644 --- a/src/common.ts +++ b/src/common.ts @@ -15,12 +15,33 @@ export const getFaceDetectorOptions = (net: faceapi.NeuralNetwork) => { : new faceapi.TinyFaceDetectorOptions({ inputSize, scoreThreshold }); }; -export function saveFile(basePath: string, fileName: string, buf: Buffer) { - if (!fs.existsSync(basePath)) { - fs.mkdirSync(basePath); - } - - fs.writeFileSync(path.resolve(basePath, fileName), buf, "base64"); +export function saveFile( + basePath: string, + fileName: string, + buf: Buffer +): Promise { + const writeFile = (): Promise => { + return new Promise((resolve, reject) => { + fs.writeFile(path.resolve(basePath, fileName), buf, "base64", (err) => { + if (err) { + return reject(err); + } + resolve(); + }); + }); + }; + return new Promise(async (resolve, reject) => { + if (!fs.existsSync(basePath)) { + fs.mkdir(basePath, async (err) => { + if (err) { + return reject(err); + } + resolve(await writeFile()); + }); + } else { + resolve(await writeFile()); + } + }); } export const delay = (ms: number): Promise => { diff --git a/src/config.ts b/src/config.ts index 53a2a8b..5783546 100644 --- a/src/config.ts +++ b/src/config.ts @@ -4,10 +4,12 @@ export interface IConfig extends PlatformConfig { refImageDirectory: string; trainedModelDirectory: string; weightDirectory: string; + outputDirectory: string; trainOnStartup: boolean; rooms: Array; detectionTimeout: number; debug: boolean; + writeOutput: boolean; } export interface IRoom { @@ -27,6 +29,10 @@ export const isConfig = (object: any): object is IConfig => { "refImageDirectory" in object && "trainedModelDirectory" in object && "weightDirectory" in object && + "outputDirectory" in object && + "trainOnStartup" in object && + "detectionTimeout" in object && + "writeOutput" in object && "rooms" in object && roomsOkay ); diff --git a/src/platform.ts b/src/homeLocationPlatform.ts similarity index 99% rename from src/platform.ts rename to src/homeLocationPlatform.ts index b32ef45..417e470 100644 --- a/src/platform.ts +++ b/src/homeLocationPlatform.ts @@ -104,7 +104,7 @@ export class HomeLocationPlatform implements DynamicPlatformPlugin { this.config.rooms, faceMatcher, this.log, - this.config.debug + this.config ); locationMonitor.startStreams(); diff --git a/src/index.ts b/src/index.ts index edf4b7d..b678c9f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,7 @@ import { API } from "homebridge"; import { PLATFORM_NAME } from "./settings"; -import { HomeLocationPlatform } from "./platform"; +import { HomeLocationPlatform } from "./homeLocationPlatform"; /** * This method registers the platform with Homebridge diff --git a/src/locationAccessory.ts b/src/locationAccessory.ts index de1e1da..89123ce 100644 --- a/src/locationAccessory.ts +++ b/src/locationAccessory.ts @@ -4,7 +4,7 @@ import { PlatformAccessory, } from "homebridge"; import { Monitor, IStateChangeEventArgs } from "./monitor"; -import { HomeLocationPlatform } from "./platform"; +import { HomeLocationPlatform } from "./homeLocationPlatform"; import { IRoom } from "./config"; /** diff --git a/src/monitor.ts b/src/monitor.ts index 1d751ff..01c4170 100644 --- a/src/monitor.ts +++ b/src/monitor.ts @@ -1,12 +1,19 @@ import { FaceMatcher } from "@vladmandic/face-api"; import { IRoom } from "./config"; -import { Rtsp, IStreamEventArgs, ICloseEventArgs } from "./rtsp"; +import { + Rtsp, + IStreamEventArgs, + ICloseEventArgs, + IErrorEventArgs, + IMessageEventArgs, +} from "./rtsp/rtsp"; import canvas from "canvas"; import * as faceapi from "@vladmandic/face-api"; import { getFaceDetectorOptions, saveFile } from "./common"; import { nets } from "@vladmandic/face-api"; import { Logger } from "homebridge"; import { Event } from "./events"; +import { IConfig } from "./config"; const { Canvas, Image, ImageData } = canvas; export type MonitorState = { [label: string]: string | null }; @@ -26,7 +33,7 @@ export class Monitor { private _rooms: Array, private _matcher: FaceMatcher, private _logger: Logger, - private _isDebug: boolean + private _config: IConfig ) { this._stateChangedEvent = new Event(); @@ -35,7 +42,7 @@ export class Monitor { this._streamsByRoom[room.name] = [ ...room.rtspConnectionStrings.map((connectionString) => { const rtsp = new Rtsp(connectionString, { - rate: 1, + rate: 0.7, image: true, }); rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) => @@ -44,6 +51,15 @@ export class Monitor { rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) => this.onExit(connectionString, args) ); + rtsp.errorEvent.push((sender: Rtsp, args: IErrorEventArgs) => + this.onError(args, connectionString) + ); + if (this._config.debug) { + rtsp.messageEvent.push((sender: Rtsp, args: IMessageEventArgs) => { + this._logger.info(`[${connectionString}] ${args.message}`); + }); + } + return rtsp; }), ]; @@ -116,25 +132,10 @@ export class Monitor { .detectAllFaces(out, getFaceDetectorOptions(this._faceDetectionNet)) .withFaceLandmarks() .withFaceDescriptors(); - if (this._isDebug) { - switch (room) { - case "Kitchen": { - saveFile( - "/Users/brandonwatson/Documents/Git/Gitea/homebridge-face-location/out", - "Kitchen.jpg", - args.data - ); - break; - } - case "LivingRoom": { - saveFile( - "/Users/brandonwatson/Documents/Git/Gitea/homebridge-face-location/out", - "LivingRoom.jpg", - args.data - ); - break; - } - } + + //Write to output image + if (this._config.writeOutput) { + await saveFile(this._config.outputDirectory, room + ".jpg", args.data); } for (const res of resultsQuery) { @@ -151,8 +152,8 @@ export class Monitor { } }; - private onError = (error: string, streamName: string) => { - this._logger.info(`[${streamName}] ${error}`); + private onError = (args: IErrorEventArgs, streamName: string) => { + this._logger.info(`[${streamName}] ${args.message}`); }; private onExit = (streamName: string, args: ICloseEventArgs) => { this._logger.info(`[${streamName}] Stream has exited: ${args.message}`); diff --git a/src/rtsp/index.ts b/src/rtsp/rtsp.ts similarity index 77% rename from src/rtsp/index.ts rename to src/rtsp/rtsp.ts index b547a88..706675b 100644 --- a/src/rtsp/index.ts +++ b/src/rtsp/rtsp.ts @@ -18,6 +18,10 @@ export interface IErrorEventArgs { err?: Error; } +export interface IMessageEventArgs { + message: string; +} + export class Rtsp { private _connecteionString: string; private _childProcess: ChildProcess | undefined; @@ -29,6 +33,7 @@ export class Rtsp { private _dataEvent: Event; private _closeEvent: Event; private _errorEvent: Event; + private _messageEvent: Event; constructor(connectionString: string, options: IOptions) { this._started = false; @@ -40,6 +45,7 @@ export class Rtsp { this._dataEvent = new Event(); this._closeEvent = new Event(); this._errorEvent = new Event(); + this._messageEvent = new Event(); this.onData = this.onData.bind(this); } @@ -56,6 +62,10 @@ export class Rtsp { return this._dataEvent; } + public get messageEvent(): Event { + return this._messageEvent; + } + public get closeEvent(): Event { return this._closeEvent; } @@ -68,6 +78,7 @@ export class Rtsp { const argStrings = [ `-i ${this._connecteionString}`, `-r ${this._options.rate ?? 10}`, + `-vf mpdecimate,setpts=N/FRAME_RATE/TB`, this._options.image ? `-f image2` : `-codec:v ${this._options.codec ?? "libx264"}`, @@ -81,19 +92,32 @@ export class Rtsp { } this._childProcess.stdout?.on("data", this.onData); + this._childProcess.stdout?.on("error", (err) => + console.log("And error occurred" + err) + ); + this._childProcess.stdout?.on("close", () => console.log("Stream closed")); + this._childProcess.stdout?.on("end", () => console.log("Stream ended")); + + //Only register this event if there are subscribers + if (this._childProcess.stderr && this._messageEvent.length > 0) { + this._childProcess.stderr.on("data", this.onMessage); + } + + this._childProcess.on("close", (code: number, signal: NodeJS.Signals) => + this._closeEvent.fire(this, { + message: "FFmpeg exited with code: " + code + " and signal: " + signal, + }) + ); + this._childProcess.on("exit", (code: number, signal: NodeJS.Signals) => this._closeEvent.fire(this, { message: "FFmpeg exited with code: " + code + " and signal: " + signal, }) ); + this._childProcess.on("error", (error: Error) => this._errorEvent.fire(this, { err: error }) ); - - //Only register this event if there are subscribers - if (this._childProcess.stderr && this._errorEvent.length > 0) { - this._childProcess.stderr.on("data", this.onStdErrorData); - } } public close(): void { @@ -113,7 +137,7 @@ export class Rtsp { return this._childProcess ? this._childProcess.stdin : null; } - private onStdErrorData = (data: any): void => { + private onMessage = (data: any): void => { if (!this._started) { this._started = true; } @@ -125,7 +149,7 @@ export class Rtsp { msg += `${line}\n`; }); - this._errorEvent.fire(this, { message: msg }); + this._messageEvent.fire(this, { message: msg }); }; private onData(data: Buffer): void {