Successful stream stress test

Need to add stream watchdog timer and accessory timeout timer
This commit is contained in:
watsonb8 2020-12-11 00:02:10 -05:00
parent 4e873edc97
commit 5cec734a09
8 changed files with 96 additions and 48 deletions

View File

@ -1,4 +1,4 @@
import { Rtsp } from "rtsp-stream/lib";
import { Rtsp, IStreamEventArgs } from "../src/rtsp/rtsp";
import { nets } from "@vladmandic/face-api";
import * as faceapi from "@vladmandic/face-api";
import canvas from "canvas";
@ -37,10 +37,10 @@ const main = async () => {
const content = JSON.parse(raw);
const matcher = faceapi.FaceMatcher.fromJSON(content);
rtsp.on("data", async (data: Buffer) => {
const input = ((await canvas.loadImage(data)) as unknown) as ImageData;
rtsp.dataEvent.push(async (sender: Rtsp, args: IStreamEventArgs) => {
const input = ((await canvas.loadImage(args.data)) as unknown) as ImageData;
const out = faceapi.createCanvasFromMedia(input);
await saveFile("image.jpg", data);
await saveFile(process.env.OUT_DIR as string, "image.jpg", args.data);
const resultsQuery = await faceapi
.detectAllFaces(out, getFaceDetectorOptions(faceDetectionNet))
.withFaceLandmarks()
@ -52,10 +52,6 @@ const main = async () => {
}
});
rtsp.on("error", (err) => {
// console.log(err);
});
rtsp.start();
};

View File

@ -15,12 +15,33 @@ export const getFaceDetectorOptions = (net: faceapi.NeuralNetwork<any>) => {
: new faceapi.TinyFaceDetectorOptions({ inputSize, scoreThreshold });
};
export function saveFile(basePath: string, fileName: string, buf: Buffer) {
if (!fs.existsSync(basePath)) {
fs.mkdirSync(basePath);
}
fs.writeFileSync(path.resolve(basePath, fileName), buf, "base64");
export function saveFile(
basePath: string,
fileName: string,
buf: Buffer
): Promise<void> {
const writeFile = (): Promise<void> => {
return new Promise((resolve, reject) => {
fs.writeFile(path.resolve(basePath, fileName), buf, "base64", (err) => {
if (err) {
return reject(err);
}
resolve();
});
});
};
return new Promise(async (resolve, reject) => {
if (!fs.existsSync(basePath)) {
fs.mkdir(basePath, async (err) => {
if (err) {
return reject(err);
}
resolve(await writeFile());
});
} else {
resolve(await writeFile());
}
});
}
export const delay = (ms: number): Promise<void> => {

View File

@ -4,10 +4,12 @@ export interface IConfig extends PlatformConfig {
refImageDirectory: string;
trainedModelDirectory: string;
weightDirectory: string;
outputDirectory: string;
trainOnStartup: boolean;
rooms: Array<IRoom>;
detectionTimeout: number;
debug: boolean;
writeOutput: boolean;
}
export interface IRoom {
@ -27,6 +29,10 @@ export const isConfig = (object: any): object is IConfig => {
"refImageDirectory" in object &&
"trainedModelDirectory" in object &&
"weightDirectory" in object &&
"outputDirectory" in object &&
"trainOnStartup" in object &&
"detectionTimeout" in object &&
"writeOutput" in object &&
"rooms" in object &&
roomsOkay
);

View File

@ -104,7 +104,7 @@ export class HomeLocationPlatform implements DynamicPlatformPlugin {
this.config.rooms,
faceMatcher,
this.log,
this.config.debug
this.config
);
locationMonitor.startStreams();

View File

@ -1,7 +1,7 @@
import { API } from "homebridge";
import { PLATFORM_NAME } from "./settings";
import { HomeLocationPlatform } from "./platform";
import { HomeLocationPlatform } from "./homeLocationPlatform";
/**
* This method registers the platform with Homebridge

View File

@ -4,7 +4,7 @@ import {
PlatformAccessory,
} from "homebridge";
import { Monitor, IStateChangeEventArgs } from "./monitor";
import { HomeLocationPlatform } from "./platform";
import { HomeLocationPlatform } from "./homeLocationPlatform";
import { IRoom } from "./config";
/**

View File

@ -1,12 +1,19 @@
import { FaceMatcher } from "@vladmandic/face-api";
import { IRoom } from "./config";
import { Rtsp, IStreamEventArgs, ICloseEventArgs } from "./rtsp";
import {
Rtsp,
IStreamEventArgs,
ICloseEventArgs,
IErrorEventArgs,
IMessageEventArgs,
} from "./rtsp/rtsp";
import canvas from "canvas";
import * as faceapi from "@vladmandic/face-api";
import { getFaceDetectorOptions, saveFile } from "./common";
import { nets } from "@vladmandic/face-api";
import { Logger } from "homebridge";
import { Event } from "./events";
import { IConfig } from "./config";
const { Canvas, Image, ImageData } = canvas;
export type MonitorState = { [label: string]: string | null };
@ -26,7 +33,7 @@ export class Monitor {
private _rooms: Array<IRoom>,
private _matcher: FaceMatcher,
private _logger: Logger,
private _isDebug: boolean
private _config: IConfig
) {
this._stateChangedEvent = new Event();
@ -35,7 +42,7 @@ export class Monitor {
this._streamsByRoom[room.name] = [
...room.rtspConnectionStrings.map((connectionString) => {
const rtsp = new Rtsp(connectionString, {
rate: 1,
rate: 0.7,
image: true,
});
rtsp.dataEvent.push((sender: Rtsp, args: IStreamEventArgs) =>
@ -44,6 +51,15 @@ export class Monitor {
rtsp.closeEvent.push((sender: Rtsp, args: ICloseEventArgs) =>
this.onExit(connectionString, args)
);
rtsp.errorEvent.push((sender: Rtsp, args: IErrorEventArgs) =>
this.onError(args, connectionString)
);
if (this._config.debug) {
rtsp.messageEvent.push((sender: Rtsp, args: IMessageEventArgs) => {
this._logger.info(`[${connectionString}] ${args.message}`);
});
}
return rtsp;
}),
];
@ -116,25 +132,10 @@ export class Monitor {
.detectAllFaces(out, getFaceDetectorOptions(this._faceDetectionNet))
.withFaceLandmarks()
.withFaceDescriptors();
if (this._isDebug) {
switch (room) {
case "Kitchen": {
saveFile(
"/Users/brandonwatson/Documents/Git/Gitea/homebridge-face-location/out",
"Kitchen.jpg",
args.data
);
break;
}
case "LivingRoom": {
saveFile(
"/Users/brandonwatson/Documents/Git/Gitea/homebridge-face-location/out",
"LivingRoom.jpg",
args.data
);
break;
}
}
//Write to output image
if (this._config.writeOutput) {
await saveFile(this._config.outputDirectory, room + ".jpg", args.data);
}
for (const res of resultsQuery) {
@ -151,8 +152,8 @@ export class Monitor {
}
};
private onError = (error: string, streamName: string) => {
this._logger.info(`[${streamName}] ${error}`);
private onError = (args: IErrorEventArgs, streamName: string) => {
this._logger.info(`[${streamName}] ${args.message}`);
};
private onExit = (streamName: string, args: ICloseEventArgs) => {
this._logger.info(`[${streamName}] Stream has exited: ${args.message}`);

View File

@ -18,6 +18,10 @@ export interface IErrorEventArgs {
err?: Error;
}
export interface IMessageEventArgs {
message: string;
}
export class Rtsp {
private _connecteionString: string;
private _childProcess: ChildProcess | undefined;
@ -29,6 +33,7 @@ export class Rtsp {
private _dataEvent: Event<this, IStreamEventArgs>;
private _closeEvent: Event<this, ICloseEventArgs>;
private _errorEvent: Event<this, IErrorEventArgs>;
private _messageEvent: Event<this, IMessageEventArgs>;
constructor(connectionString: string, options: IOptions) {
this._started = false;
@ -40,6 +45,7 @@ export class Rtsp {
this._dataEvent = new Event();
this._closeEvent = new Event();
this._errorEvent = new Event();
this._messageEvent = new Event();
this.onData = this.onData.bind(this);
}
@ -56,6 +62,10 @@ export class Rtsp {
return this._dataEvent;
}
public get messageEvent(): Event<this, IMessageEventArgs> {
return this._messageEvent;
}
public get closeEvent(): Event<this, ICloseEventArgs> {
return this._closeEvent;
}
@ -68,6 +78,7 @@ export class Rtsp {
const argStrings = [
`-i ${this._connecteionString}`,
`-r ${this._options.rate ?? 10}`,
`-vf mpdecimate,setpts=N/FRAME_RATE/TB`,
this._options.image
? `-f image2`
: `-codec:v ${this._options.codec ?? "libx264"}`,
@ -81,19 +92,32 @@ export class Rtsp {
}
this._childProcess.stdout?.on("data", this.onData);
this._childProcess.stdout?.on("error", (err) =>
console.log("And error occurred" + err)
);
this._childProcess.stdout?.on("close", () => console.log("Stream closed"));
this._childProcess.stdout?.on("end", () => console.log("Stream ended"));
//Only register this event if there are subscribers
if (this._childProcess.stderr && this._messageEvent.length > 0) {
this._childProcess.stderr.on("data", this.onMessage);
}
this._childProcess.on("close", (code: number, signal: NodeJS.Signals) =>
this._closeEvent.fire(this, {
message: "FFmpeg exited with code: " + code + " and signal: " + signal,
})
);
this._childProcess.on("exit", (code: number, signal: NodeJS.Signals) =>
this._closeEvent.fire(this, {
message: "FFmpeg exited with code: " + code + " and signal: " + signal,
})
);
this._childProcess.on("error", (error: Error) =>
this._errorEvent.fire(this, { err: error })
);
//Only register this event if there are subscribers
if (this._childProcess.stderr && this._errorEvent.length > 0) {
this._childProcess.stderr.on("data", this.onStdErrorData);
}
}
public close(): void {
@ -113,7 +137,7 @@ export class Rtsp {
return this._childProcess ? this._childProcess.stdin : null;
}
private onStdErrorData = (data: any): void => {
private onMessage = (data: any): void => {
if (!this._started) {
this._started = true;
}
@ -125,7 +149,7 @@ export class Rtsp {
msg += `${line}\n`;
});
this._errorEvent.fire(this, { message: msg });
this._messageEvent.fire(this, { message: msg });
};
private onData(data: Buffer): void {