import { FaceMatcher } from "@vladmandic/face-api"; import { IRoom } from "../config"; import { Rtsp, IStreamEventArgs, ICloseEventArgs, IErrorEventArgs, IMessageEventArgs, } from "../rtsp/rtsp"; import canvas from "canvas"; import * as faceapi from "@vladmandic/face-api"; import { getFaceDetectorOptions, saveFile } from "../common"; import { nets } from "@vladmandic/face-api"; import { Logger } from "homebridge"; import { Event } from "../events"; import { IConfig } from "../config"; import { MonitorState } from "./monitorState"; import { IStream } from "./stream"; const { Canvas, Image, ImageData } = canvas; const defaultWatchDog = 30000; const defaultRate = 0.7; export interface IStateChangeEventArgs { label: string; old: string | null; new: string; } export class Monitor { private _state: MonitorState = {}; private _streamsByRoom: { [roomName: string]: Array } = {}; private _faceDetectionNet = nets.ssdMobilenetv1; private _stateChangedEvent: Event; constructor( rooms: Array, private _matcher: FaceMatcher, private _logger: Logger, private _config: IConfig ) { this._stateChangedEvent = new Event(); //Initialize state for (const room of rooms) { this._streamsByRoom[room.name] = [ ...room.rtspConnectionStrings.map((connectionString) => { return this.getNewStream(connectionString, room.name); }), ]; _matcher.labeledDescriptors.forEach((descriptor) => { this._state[descriptor.label] = null; }); } } /** * @method getState * * @param label The name of the label to retrieve state for * * The last known room of the requested label */ public getState(label: string): string | null { return this._state[label]; } public resetState(label: string): Monitor { this._state[label] = null; return this; } /** * @property labels * * Gets the list of labels associated with the monitor */ public get labels(): Array { return this._matcher.labeledDescriptors .map((descriptor) => descriptor.label) .filter( (label: string, index: number, array: Array) => array.indexOf(label) === index ); } public get stateChangedEvent(): Event { return this._stateChangedEvent; } /** * @method startStreams * * Starts monitoring rtsp streams */ public startStreams(): Monitor { for (const key in this._streamsByRoom) { for (const stream of this._streamsByRoom[key]) { //Start stream stream.rtsp.start(); //Start watchdog timer stream.watchdogTimer = setTimeout( () => this.onWatchdogTimeout(stream, key), this._config.watchdogTimeout ?? defaultWatchDog ); } } return this; } /** * @method closeStreams * * Stops monitoring rtsp streams */ public closeStreams(): Monitor { for (const key in this._streamsByRoom) { for (const stream of this._streamsByRoom[key]) { stream.rtsp.close(); //Stop watchdog timer if (stream.watchdogTimer) { clearTimeout(stream.watchdogTimer); } } } return this; } private onData = async ( room: string, stream: IStream, args: IStreamEventArgs ) => { //Reset watchdog timer for the stream clearTimeout(stream.watchdogTimer!); stream.watchdogTimer = setTimeout( () => this.onWatchdogTimeout(stream, room), this._config.watchdogTimeout ?? 30000 ); //Detect faces in image const input = ((await canvas.loadImage(args.data)) as unknown) as ImageData; const out = faceapi.createCanvasFromMedia(input); const resultsQuery = await faceapi .detectAllFaces(out, getFaceDetectorOptions(this._faceDetectionNet)) .withFaceLandmarks() .withFaceDescriptors(); //Write to output image if (this._config.writeOutput) { await saveFile(this._config.outputDirectory, room + ".jpg", args.data); } for (const res of resultsQuery) { const bestMatch = this._matcher.matchDescriptor(res.descriptor); const old = this._state[bestMatch.label]; this._state[bestMatch.label] = room; this._stateChangedEvent.fire(this, { old: old, new: room, label: bestMatch.label, }); if (this._config.debug) { this._logger.info(`Face Detected: ${bestMatch.label} in room ${room}`); } } }; private getNewStream(connectionString: string, roomName: string): IStream { const stream = { rtsp: new Rtsp(connectionString, { rate: this._config.rate ?? defaultRate, image: true, }), watchdogTimer: null, detectionTimer: null, connectionString: connectionString, }; connectionString = this.getRedactedConnectionString(connectionString); //Subscribe to rtsp events stream.rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) => this.onData(roomName, stream, args) ); //Only subscribe to these events if debug if (this._config.debug) { stream.rtsp.messageEvent.push((sender: Rtsp, args: IMessageEventArgs) => { this._logger.info(`[${connectionString}] ${args.message}`); }); stream.rtsp.errorEvent.push((sender: Rtsp, args: IErrorEventArgs) => { this._logger.info(`[${connectionString}] ${args.message}`); }); stream.rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) => { this._logger.info( `[${connectionString}] Stream has exited: ${args.message}` ); }); } return stream; } private onWatchdogTimeout = async (stream: IStream, roomName: string) => { this._logger.info( `[${this.getRedactedConnectionString( stream.connectionString )}] Watchdog timeout: restarting stream` ); //Close and remove old stream stream.rtsp.close(); this._streamsByRoom[roomName].splice( this._streamsByRoom[roomName].indexOf(stream), 1 ); //Create and add new stream this._streamsByRoom[roomName].push( this.getNewStream(stream.connectionString, roomName) ); stream.rtsp.start(); }; private getRedactedConnectionString(connectionString: string) { const pwSepIdx = connectionString.lastIndexOf(":") + 1; const pwEndIdx = connectionString.indexOf("@"); return ( connectionString.substring(0, pwSepIdx) + connectionString.substring(pwEndIdx) ); } }