From 9316028e2cf6702257ae554c8fcd09dbbd2d6c2b Mon Sep 17 00:00:00 2001 From: watsonb8 Date: Fri, 11 Dec 2020 10:45:34 -0500 Subject: [PATCH] Adding stream watchdog timer to reset stream if it fails --- src/config.ts | 11 +-- src/homeLocationPlatform.ts | 2 +- src/locationAccessory.ts | 2 +- src/{ => monitor}/monitor.ts | 131 +++++++++++++++++++++++++---------- src/monitor/monitorState.ts | 1 + src/monitor/stream.ts | 8 +++ 6 files changed, 110 insertions(+), 45 deletions(-) rename src/{ => monitor}/monitor.ts (50%) create mode 100644 src/monitor/monitorState.ts create mode 100644 src/monitor/stream.ts diff --git a/src/config.ts b/src/config.ts index 5783546..b45e570 100644 --- a/src/config.ts +++ b/src/config.ts @@ -7,9 +7,11 @@ export interface IConfig extends PlatformConfig { outputDirectory: string; trainOnStartup: boolean; rooms: Array; - detectionTimeout: number; - debug: boolean; - writeOutput: boolean; + detectionTimeout?: number; + watchdogTimeout?: number; + debug?: boolean; + writeOutput?: boolean; + rate?: number; } export interface IRoom { @@ -25,14 +27,13 @@ export const isConfig = (object: any): object is IConfig => { const roomsOkay = object["rooms"].filter((room: any) => isRoom(room)).length === object["rooms"].length; + return ( "refImageDirectory" in object && "trainedModelDirectory" in object && "weightDirectory" in object && "outputDirectory" in object && "trainOnStartup" in object && - "detectionTimeout" in object && - "writeOutput" in object && "rooms" in object && roomsOkay ); diff --git a/src/homeLocationPlatform.ts b/src/homeLocationPlatform.ts index 417e470..bf3ac36 100644 --- a/src/homeLocationPlatform.ts +++ b/src/homeLocationPlatform.ts @@ -19,7 +19,7 @@ import { FaceMatcher, } from "@vladmandic/face-api"; import * as mime from "mime-types"; -import { Monitor } from "./monitor"; +import { Monitor } from "./monitor/monitor"; import { getFaceDetectorOptions } from "./common"; require("@tensorflow/tfjs-node"); diff --git a/src/locationAccessory.ts b/src/locationAccessory.ts index 89123ce..45ab8b1 100644 --- a/src/locationAccessory.ts +++ b/src/locationAccessory.ts @@ -3,7 +3,7 @@ import { CharacteristicGetCallback, PlatformAccessory, } from "homebridge"; -import { Monitor, IStateChangeEventArgs } from "./monitor"; +import { Monitor, IStateChangeEventArgs } from "./monitor/monitor"; import { HomeLocationPlatform } from "./homeLocationPlatform"; import { IRoom } from "./config"; diff --git a/src/monitor.ts b/src/monitor/monitor.ts similarity index 50% rename from src/monitor.ts rename to src/monitor/monitor.ts index 01c4170..37d6503 100644 --- a/src/monitor.ts +++ b/src/monitor/monitor.ts @@ -1,22 +1,26 @@ import { FaceMatcher } from "@vladmandic/face-api"; -import { IRoom } from "./config"; +import { IRoom } from "../config"; import { Rtsp, IStreamEventArgs, ICloseEventArgs, IErrorEventArgs, IMessageEventArgs, -} from "./rtsp/rtsp"; +} from "../rtsp/rtsp"; import canvas from "canvas"; import * as faceapi from "@vladmandic/face-api"; -import { getFaceDetectorOptions, saveFile } from "./common"; +import { getFaceDetectorOptions, saveFile } from "../common"; import { nets } from "@vladmandic/face-api"; import { Logger } from "homebridge"; -import { Event } from "./events"; -import { IConfig } from "./config"; +import { Event } from "../events"; +import { IConfig } from "../config"; +import { MonitorState } from "./monitorState"; +import { IStream } from "./stream"; const { Canvas, Image, ImageData } = canvas; -export type MonitorState = { [label: string]: string | null }; +const defaultWatchDog = 30000; +const defaultRate = 0.7; + export interface IStateChangeEventArgs { label: string; old: string | null; @@ -25,12 +29,12 @@ export interface IStateChangeEventArgs { export class Monitor { private _state: MonitorState = {}; - private _streamsByRoom: { [roomName: string]: Array } = {}; + private _streamsByRoom: { [roomName: string]: Array } = {}; private _faceDetectionNet = nets.ssdMobilenetv1; private _stateChangedEvent: Event; constructor( - private _rooms: Array, + rooms: Array, private _matcher: FaceMatcher, private _logger: Logger, private _config: IConfig @@ -38,29 +42,10 @@ export class Monitor { this._stateChangedEvent = new Event(); //Initialize state - for (const room of this._rooms) { + for (const room of rooms) { this._streamsByRoom[room.name] = [ ...room.rtspConnectionStrings.map((connectionString) => { - const rtsp = new Rtsp(connectionString, { - rate: 0.7, - image: true, - }); - rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) => - this.onData(room.name, args) - ); - rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) => - this.onExit(connectionString, args) - ); - rtsp.errorEvent.push((sender: Rtsp, args: IErrorEventArgs) => - this.onError(args, connectionString) - ); - if (this._config.debug) { - rtsp.messageEvent.push((sender: Rtsp, args: IMessageEventArgs) => { - this._logger.info(`[${connectionString}] ${args.message}`); - }); - } - - return rtsp; + return this.getNewStream(connectionString, room.name); }), ]; @@ -107,7 +92,14 @@ export class Monitor { public startStreams() { for (const key in this._streamsByRoom) { for (const stream of this._streamsByRoom[key]) { - stream.start(); + //Start stream + stream.rtsp.start(); + + //Start watchdog timer + stream.watchdogTimer = setTimeout( + () => this.onWatchdogTimeout(stream, key), + this._config.watchdogTimeout ?? defaultWatchDog + ); } } } @@ -120,12 +112,29 @@ export class Monitor { public closeStreams() { for (const key in this._streamsByRoom) { for (const stream of this._streamsByRoom[key]) { - stream.close(); + stream.rtsp.close(); + + //Stop watchdog timer + if (stream.watchdogTimer) { + clearTimeout(stream.watchdogTimer); + } } } } - private onData = async (room: string, args: IStreamEventArgs) => { + private onData = async ( + room: string, + stream: IStream, + args: IStreamEventArgs + ) => { + //Reset watchdog timer for the stream + clearTimeout(stream.watchdogTimer!); + stream.watchdogTimer = setTimeout( + () => this.onWatchdogTimeout(stream, room), + this._config.watchdogTimeout ?? 30000 + ); + + //Detect faces in image const input = ((await canvas.loadImage(args.data)) as unknown) as ImageData; const out = faceapi.createCanvasFromMedia(input); const resultsQuery = await faceapi @@ -148,14 +157,60 @@ export class Monitor { label: bestMatch.label, }); - this._logger.info(`Face Detected: ${bestMatch.label} in room ${room}`); + if (this._config.debug) { + this._logger.info(`Face Detected: ${bestMatch.label} in room ${room}`); + } } }; - private onError = (args: IErrorEventArgs, streamName: string) => { - this._logger.info(`[${streamName}] ${args.message}`); - }; - private onExit = (streamName: string, args: ICloseEventArgs) => { - this._logger.info(`[${streamName}] Stream has exited: ${args.message}`); + private getNewStream(connectionString: string, roomName: string): IStream { + const stream = { + rtsp: new Rtsp(connectionString, { + rate: this._config.rate ?? defaultRate, + image: true, + }), + watchdogTimer: null, + detectionTimer: null, + connectionString: connectionString, + }; + + //Subscribe to rtsp events + stream.rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) => + this.onData(roomName, stream, args) + ); + stream.rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) => { + this._logger.info( + `[${connectionString}] Stream has exited: ${args.message}` + ); + }); + stream.rtsp.errorEvent.push((sender: Rtsp, args: IErrorEventArgs) => { + this._logger.info(`[${connectionString}] ${args.message}`); + }); + if (this._config.debug) { + stream.rtsp.messageEvent.push((sender: Rtsp, args: IMessageEventArgs) => { + this._logger.info(`[${connectionString}] ${args.message}`); + }); + } + + return stream; + } + + private onWatchdogTimeout = async (stream: IStream, roomName: string) => { + this._logger.info( + `[${stream.connectionString}] Watchdog timeout: restarting stream` + ); + + //Close and remove old stream + stream.rtsp.close(); + this._streamsByRoom[roomName].splice( + this._streamsByRoom[roomName].indexOf(stream), + 1 + ); + + //Create and add new stream + this._streamsByRoom[roomName].push( + this.getNewStream(stream.connectionString, roomName) + ); + stream.rtsp.start(); }; } diff --git a/src/monitor/monitorState.ts b/src/monitor/monitorState.ts new file mode 100644 index 0000000..8c2cf5c --- /dev/null +++ b/src/monitor/monitorState.ts @@ -0,0 +1 @@ +export type MonitorState = { [label: string]: string | null }; diff --git a/src/monitor/stream.ts b/src/monitor/stream.ts new file mode 100644 index 0000000..5876ec6 --- /dev/null +++ b/src/monitor/stream.ts @@ -0,0 +1,8 @@ +import { Rtsp } from "../rtsp/rtsp"; + +export interface IStream { + rtsp: Rtsp; + connectionString: string; + watchdogTimer: NodeJS.Timeout | null; + detectionTimer: NodeJS.Timeout | null; +}