Adding stream watchdog timer to reset stream if it fails

This commit is contained in:
watsonb8 2020-12-11 10:45:34 -05:00
parent 10b7ecccb7
commit 9316028e2c
6 changed files with 110 additions and 45 deletions

View File

@ -7,9 +7,11 @@ export interface IConfig extends PlatformConfig {
outputDirectory: string; outputDirectory: string;
trainOnStartup: boolean; trainOnStartup: boolean;
rooms: Array<IRoom>; rooms: Array<IRoom>;
detectionTimeout: number; detectionTimeout?: number;
debug: boolean; watchdogTimeout?: number;
writeOutput: boolean; debug?: boolean;
writeOutput?: boolean;
rate?: number;
} }
export interface IRoom { export interface IRoom {
@ -25,14 +27,13 @@ export const isConfig = (object: any): object is IConfig => {
const roomsOkay = const roomsOkay =
object["rooms"].filter((room: any) => isRoom(room)).length === object["rooms"].filter((room: any) => isRoom(room)).length ===
object["rooms"].length; object["rooms"].length;
return ( return (
"refImageDirectory" in object && "refImageDirectory" in object &&
"trainedModelDirectory" in object && "trainedModelDirectory" in object &&
"weightDirectory" in object && "weightDirectory" in object &&
"outputDirectory" in object && "outputDirectory" in object &&
"trainOnStartup" in object && "trainOnStartup" in object &&
"detectionTimeout" in object &&
"writeOutput" in object &&
"rooms" in object && "rooms" in object &&
roomsOkay roomsOkay
); );

View File

@ -19,7 +19,7 @@ import {
FaceMatcher, FaceMatcher,
} from "@vladmandic/face-api"; } from "@vladmandic/face-api";
import * as mime from "mime-types"; import * as mime from "mime-types";
import { Monitor } from "./monitor"; import { Monitor } from "./monitor/monitor";
import { getFaceDetectorOptions } from "./common"; import { getFaceDetectorOptions } from "./common";
require("@tensorflow/tfjs-node"); require("@tensorflow/tfjs-node");

View File

@ -3,7 +3,7 @@ import {
CharacteristicGetCallback, CharacteristicGetCallback,
PlatformAccessory, PlatformAccessory,
} from "homebridge"; } from "homebridge";
import { Monitor, IStateChangeEventArgs } from "./monitor"; import { Monitor, IStateChangeEventArgs } from "./monitor/monitor";
import { HomeLocationPlatform } from "./homeLocationPlatform"; import { HomeLocationPlatform } from "./homeLocationPlatform";
import { IRoom } from "./config"; import { IRoom } from "./config";

View File

@ -1,22 +1,26 @@
import { FaceMatcher } from "@vladmandic/face-api"; import { FaceMatcher } from "@vladmandic/face-api";
import { IRoom } from "./config"; import { IRoom } from "../config";
import { import {
Rtsp, Rtsp,
IStreamEventArgs, IStreamEventArgs,
ICloseEventArgs, ICloseEventArgs,
IErrorEventArgs, IErrorEventArgs,
IMessageEventArgs, IMessageEventArgs,
} from "./rtsp/rtsp"; } from "../rtsp/rtsp";
import canvas from "canvas"; import canvas from "canvas";
import * as faceapi from "@vladmandic/face-api"; import * as faceapi from "@vladmandic/face-api";
import { getFaceDetectorOptions, saveFile } from "./common"; import { getFaceDetectorOptions, saveFile } from "../common";
import { nets } from "@vladmandic/face-api"; import { nets } from "@vladmandic/face-api";
import { Logger } from "homebridge"; import { Logger } from "homebridge";
import { Event } from "./events"; import { Event } from "../events";
import { IConfig } from "./config"; import { IConfig } from "../config";
import { MonitorState } from "./monitorState";
import { IStream } from "./stream";
const { Canvas, Image, ImageData } = canvas; const { Canvas, Image, ImageData } = canvas;
export type MonitorState = { [label: string]: string | null }; const defaultWatchDog = 30000;
const defaultRate = 0.7;
export interface IStateChangeEventArgs { export interface IStateChangeEventArgs {
label: string; label: string;
old: string | null; old: string | null;
@ -25,12 +29,12 @@ export interface IStateChangeEventArgs {
export class Monitor { export class Monitor {
private _state: MonitorState = {}; private _state: MonitorState = {};
private _streamsByRoom: { [roomName: string]: Array<Rtsp> } = {}; private _streamsByRoom: { [roomName: string]: Array<IStream> } = {};
private _faceDetectionNet = nets.ssdMobilenetv1; private _faceDetectionNet = nets.ssdMobilenetv1;
private _stateChangedEvent: Event<this, IStateChangeEventArgs>; private _stateChangedEvent: Event<this, IStateChangeEventArgs>;
constructor( constructor(
private _rooms: Array<IRoom>, rooms: Array<IRoom>,
private _matcher: FaceMatcher, private _matcher: FaceMatcher,
private _logger: Logger, private _logger: Logger,
private _config: IConfig private _config: IConfig
@ -38,29 +42,10 @@ export class Monitor {
this._stateChangedEvent = new Event(); this._stateChangedEvent = new Event();
//Initialize state //Initialize state
for (const room of this._rooms) { for (const room of rooms) {
this._streamsByRoom[room.name] = [ this._streamsByRoom[room.name] = [
...room.rtspConnectionStrings.map((connectionString) => { ...room.rtspConnectionStrings.map((connectionString) => {
const rtsp = new Rtsp(connectionString, { return this.getNewStream(connectionString, room.name);
rate: 0.7,
image: true,
});
rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) =>
this.onData(room.name, args)
);
rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) =>
this.onExit(connectionString, args)
);
rtsp.errorEvent.push((sender: Rtsp, args: IErrorEventArgs) =>
this.onError(args, connectionString)
);
if (this._config.debug) {
rtsp.messageEvent.push((sender: Rtsp, args: IMessageEventArgs) => {
this._logger.info(`[${connectionString}] ${args.message}`);
});
}
return rtsp;
}), }),
]; ];
@ -107,7 +92,14 @@ export class Monitor {
public startStreams() { public startStreams() {
for (const key in this._streamsByRoom) { for (const key in this._streamsByRoom) {
for (const stream of this._streamsByRoom[key]) { for (const stream of this._streamsByRoom[key]) {
stream.start(); //Start stream
stream.rtsp.start();
//Start watchdog timer
stream.watchdogTimer = setTimeout(
() => this.onWatchdogTimeout(stream, key),
this._config.watchdogTimeout ?? defaultWatchDog
);
} }
} }
} }
@ -120,12 +112,29 @@ export class Monitor {
public closeStreams() { public closeStreams() {
for (const key in this._streamsByRoom) { for (const key in this._streamsByRoom) {
for (const stream of this._streamsByRoom[key]) { for (const stream of this._streamsByRoom[key]) {
stream.close(); stream.rtsp.close();
//Stop watchdog timer
if (stream.watchdogTimer) {
clearTimeout(stream.watchdogTimer);
}
} }
} }
} }
private onData = async (room: string, args: IStreamEventArgs) => { private onData = async (
room: string,
stream: IStream,
args: IStreamEventArgs
) => {
//Reset watchdog timer for the stream
clearTimeout(stream.watchdogTimer!);
stream.watchdogTimer = setTimeout(
() => this.onWatchdogTimeout(stream, room),
this._config.watchdogTimeout ?? 30000
);
//Detect faces in image
const input = ((await canvas.loadImage(args.data)) as unknown) as ImageData; const input = ((await canvas.loadImage(args.data)) as unknown) as ImageData;
const out = faceapi.createCanvasFromMedia(input); const out = faceapi.createCanvasFromMedia(input);
const resultsQuery = await faceapi const resultsQuery = await faceapi
@ -148,14 +157,60 @@ export class Monitor {
label: bestMatch.label, label: bestMatch.label,
}); });
if (this._config.debug) {
this._logger.info(`Face Detected: ${bestMatch.label} in room ${room}`); this._logger.info(`Face Detected: ${bestMatch.label} in room ${room}`);
} }
}
}; };
private onError = (args: IErrorEventArgs, streamName: string) => { private getNewStream(connectionString: string, roomName: string): IStream {
this._logger.info(`[${streamName}] ${args.message}`); const stream = {
rtsp: new Rtsp(connectionString, {
rate: this._config.rate ?? defaultRate,
image: true,
}),
watchdogTimer: null,
detectionTimer: null,
connectionString: connectionString,
}; };
private onExit = (streamName: string, args: ICloseEventArgs) => {
this._logger.info(`[${streamName}] Stream has exited: ${args.message}`); //Subscribe to rtsp events
stream.rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) =>
this.onData(roomName, stream, args)
);
stream.rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) => {
this._logger.info(
`[${connectionString}] Stream has exited: ${args.message}`
);
});
stream.rtsp.errorEvent.push((sender: Rtsp, args: IErrorEventArgs) => {
this._logger.info(`[${connectionString}] ${args.message}`);
});
if (this._config.debug) {
stream.rtsp.messageEvent.push((sender: Rtsp, args: IMessageEventArgs) => {
this._logger.info(`[${connectionString}] ${args.message}`);
});
}
return stream;
}
private onWatchdogTimeout = async (stream: IStream, roomName: string) => {
this._logger.info(
`[${stream.connectionString}] Watchdog timeout: restarting stream`
);
//Close and remove old stream
stream.rtsp.close();
this._streamsByRoom[roomName].splice(
this._streamsByRoom[roomName].indexOf(stream),
1
);
//Create and add new stream
this._streamsByRoom[roomName].push(
this.getNewStream(stream.connectionString, roomName)
);
stream.rtsp.start();
}; };
} }

View File

@ -0,0 +1 @@
export type MonitorState = { [label: string]: string | null };

8
src/monitor/stream.ts Normal file
View File

@ -0,0 +1,8 @@
import { Rtsp } from "../rtsp/rtsp";
export interface IStream {
rtsp: Rtsp;
connectionString: string;
watchdogTimer: NodeJS.Timeout | null;
detectionTimer: NodeJS.Timeout | null;
}