2020-12-12 00:14:56 -05:00

237 lines
6.4 KiB
TypeScript

import { FaceMatcher } from "@vladmandic/face-api";
import { IRoom } from "../config";
import {
Rtsp,
IStreamEventArgs,
ICloseEventArgs,
IErrorEventArgs,
IMessageEventArgs,
} from "../rtsp/rtsp";
import canvas from "canvas";
import * as faceapi from "@vladmandic/face-api";
import { getFaceDetectorOptions, saveFile } from "../common";
import { nets } from "@vladmandic/face-api";
import { Logger } from "homebridge";
import { Event } from "../events";
import { IConfig } from "../config";
import { MonitorState } from "./monitorState";
import { IStream } from "./stream";
const { Canvas, Image, ImageData } = canvas;
const defaultWatchDog = 30000;
const defaultRate = 0.7;
export interface IStateChangeEventArgs {
label: string;
old: string | null;
new: string;
}
export class Monitor {
private _state: MonitorState = {};
private _streamsByRoom: { [roomName: string]: Array<IStream> } = {};
private _faceDetectionNet = nets.ssdMobilenetv1;
private _stateChangedEvent: Event<this, IStateChangeEventArgs>;
constructor(
rooms: Array<IRoom>,
private _matcher: FaceMatcher,
private _logger: Logger,
private _config: IConfig
) {
this._stateChangedEvent = new Event();
//Initialize state
for (const room of rooms) {
this._streamsByRoom[room.name] = [
...room.rtspConnectionStrings.map((connectionString) => {
return this.getNewStream(connectionString, room.name);
}),
];
_matcher.labeledDescriptors.forEach((descriptor) => {
this._state[descriptor.label] = null;
});
}
}
/**
* @method getState
*
* @param label The name of the label to retrieve state for
*
* The last known room of the requested label
*/
public getState(label: string): string | null {
return this._state[label];
}
public resetState(label: string): Monitor {
this._state[label] = null;
return this;
}
/**
* @property labels
*
* Gets the list of labels associated with the monitor
*/
public get labels(): Array<string> {
return this._matcher.labeledDescriptors
.map((descriptor) => descriptor.label)
.filter(
(label: string, index: number, array: Array<string>) =>
array.indexOf(label) === index
);
}
public get stateChangedEvent(): Event<this, IStateChangeEventArgs> {
return this._stateChangedEvent;
}
/**
* @method startStreams
*
* Starts monitoring rtsp streams
*/
public startStreams(): Monitor {
for (const key in this._streamsByRoom) {
for (const stream of this._streamsByRoom[key]) {
//Start stream
stream.rtsp.start();
//Start watchdog timer
stream.watchdogTimer = setTimeout(
() => this.onWatchdogTimeout(stream, key),
this._config.watchdogTimeout ?? defaultWatchDog
);
}
}
return this;
}
/**
* @method closeStreams
*
* Stops monitoring rtsp streams
*/
public closeStreams(): Monitor {
for (const key in this._streamsByRoom) {
for (const stream of this._streamsByRoom[key]) {
stream.rtsp.close();
//Stop watchdog timer
if (stream.watchdogTimer) {
clearTimeout(stream.watchdogTimer);
}
}
}
return this;
}
private onData = async (
room: string,
stream: IStream,
args: IStreamEventArgs
) => {
//Reset watchdog timer for the stream
clearTimeout(stream.watchdogTimer!);
stream.watchdogTimer = setTimeout(
() => this.onWatchdogTimeout(stream, room),
this._config.watchdogTimeout ?? 30000
);
//Detect faces in image
const input = ((await canvas.loadImage(args.data)) as unknown) as ImageData;
const out = faceapi.createCanvasFromMedia(input);
const resultsQuery = await faceapi
.detectAllFaces(out, getFaceDetectorOptions(this._faceDetectionNet))
.withFaceLandmarks()
.withFaceDescriptors();
//Write to output image
if (this._config.writeOutput) {
await saveFile(this._config.outputDirectory, room + ".jpg", args.data);
}
for (const res of resultsQuery) {
const bestMatch = this._matcher.matchDescriptor(res.descriptor);
const old = this._state[bestMatch.label];
this._state[bestMatch.label] = room;
this._stateChangedEvent.fire(this, {
old: old,
new: room,
label: bestMatch.label,
});
if (this._config.debug) {
this._logger.info(`Face Detected: ${bestMatch.label} in room ${room}`);
}
}
};
private getNewStream(connectionString: string, roomName: string): IStream {
const stream = {
rtsp: new Rtsp(connectionString, {
rate: this._config.rate ?? defaultRate,
image: true,
}),
watchdogTimer: null,
detectionTimer: null,
connectionString: connectionString,
};
connectionString = this.getRedactedConnectionString(connectionString);
//Subscribe to rtsp events
stream.rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) =>
this.onData(roomName, stream, args)
);
//Only subscribe to these events if debug
if (this._config.debug) {
stream.rtsp.messageEvent.push((sender: Rtsp, args: IMessageEventArgs) => {
this._logger.info(`[${connectionString}] ${args.message}`);
});
stream.rtsp.errorEvent.push((sender: Rtsp, args: IErrorEventArgs) => {
this._logger.info(`[${connectionString}] ${args.message}`);
});
stream.rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) => {
this._logger.info(
`[${connectionString}] Stream has exited: ${args.message}`
);
});
}
return stream;
}
private onWatchdogTimeout = async (stream: IStream, roomName: string) => {
this._logger.info(
`[${stream.connectionString}] Watchdog timeout: restarting stream`
);
//Close and remove old stream
stream.rtsp.close();
this._streamsByRoom[roomName].splice(
this._streamsByRoom[roomName].indexOf(stream),
1
);
//Create and add new stream
this._streamsByRoom[roomName].push(
this.getNewStream(stream.connectionString, roomName)
);
stream.rtsp.start();
};
private getRedactedConnectionString(connectionString: string) {
const pwSepIdx = connectionString.lastIndexOf(":") + 1;
const pwEndIdx = connectionString.indexOf("@");
return (
connectionString.substring(0, pwSepIdx) +
connectionString.substring(pwEndIdx)
);
}
}