using System; using System.Threading.Tasks; using System.Threading; using System.Collections.Generic; using Aurora.Services; using System.Linq; using Aurora.Services.EventManager; using Aurora.Proto.Events; namespace Aurora.RemoteImpl { public class RemoteEventServiceImpl : RemoteEventService.RemoteEventServiceBase { public RemoteEventServiceImpl() { } /// /// RPC for getting event stream for a particular client. /// /// Empty /// The response stream /// gRPC client context /// public async override Task GetEvents(EventsRequest request, Grpc.Core.IServerStreamWriter responseStream, Grpc.Core.ServerCallContext context) { string peerId = Combine(new string[] { context.Peer, request.ClientId }); Console.WriteLine(string.Format("SERVER - Events request received from peer: {0}", peerId)); AutoResetEvent are = new AutoResetEvent(false); Action callback = (BaseEvent bEvent) => { Console.WriteLine(string.Format("SERVER - Event fired for peer: {0}", peerId)); //TODO need to remove callback if stream no longer exists IE. Client crashed or stopped responseStream.WriteAsync(bEvent); }; EventManager.Instance.AddEventHandler(callback, Combine(new string[] { context.Peer, request.ClientId })); are.WaitOne(); } /// /// RPC for subscribing to remote events /// /// SubscribeRequest /// gRPC client context /// public override Task SubscribeToEvents(SubscribeRequest request, Grpc.Core.ServerCallContext context) { Console.WriteLine(string.Format("SERVER - Subscription from client with id: {0}", request.ClientId)); EventManager.Instance.AddSubscriptionList(Combine(new string[] { context.Peer, request.ClientId }), request.EventTypes.ToList()); return Task.FromResult(new SubscriptionResponse { Successful = true }); } /// /// RPC for unsubscibing from events /// /// UnsubscribeRequest /// gRPC client context /// public override Task UnsubscribeFromEvents(UnsubscribeRequest request, Grpc.Core.ServerCallContext context) { EventType[] eventTypes = null; request.EventTypes.CopyTo(eventTypes, 0); EventManager.Instance.RemoveSubscriptionList(Combine(new string[] { context.Peer, request.ClientId }), eventTypes.ToList()); return Task.FromResult(new SubscriptionResponse { Successful = true }); } /// /// RPC for unsubscribing from all events /// /// UnsubscribeAllRequest /// gRPC client context /// public override Task UnsubscribeFromAll(UnsubscribeAllRequest request, Grpc.Core.ServerCallContext context) { EventManager.Instance.RemoveAllSubscriptions(Combine(new string[] { context.Peer, request.ClientId })); return Task.FromResult(new SubscriptionResponse { Successful = true }); } private string Combine(string[] args) { string outString = ""; foreach (string arg in args) { if (arg == args.Last()) { outString += arg; } else { outString += arg + ":"; } } return outString; } } }