using System; using System.Threading.Tasks; using System.Threading; using System.Collections.Generic; using Aurora.Services; using System.Linq; using Aurora.Services.EventManager; using Aurora.Proto.Events; namespace Aurora.RemoteImpl { public class RemoteEventServiceImpl : RemoteEventService.RemoteEventServiceBase { public RemoteEventServiceImpl() { } /// <summary> /// RPC for getting event stream for a particular client. /// </summary> /// <param name="request">Empty</param> /// <param name="responseStream">The response stream</param> /// <param name="context">gRPC client context</param> /// <returns></returns> public async override Task GetEvents(EventsRequest request, Grpc.Core.IServerStreamWriter<BaseEvent> responseStream, Grpc.Core.ServerCallContext context) { string peerId = Combine(new string[] { context.Peer, request.ClientId }); Console.WriteLine(string.Format("SERVER - Events request received from peer: {0}", peerId)); AutoResetEvent are = new AutoResetEvent(false); Action<BaseEvent> callback = (BaseEvent bEvent) => { Console.WriteLine(string.Format("SERVER - Event fired for peer: {0}", peerId)); responseStream.WriteAsync(bEvent); }; EventManager.Instance.AddEventHandler(callback, Combine(new string[] { context.Peer, request.ClientId })); are.WaitOne(); } /// <summary> /// RPC for subscribing to remote events /// </summary> /// <param name="request">SubscribeRequest</param> /// <param name="context">gRPC client context</param> /// <returns></returns> public override Task<SubscriptionResponse> SubscribeToEvents(SubscribeRequest request, Grpc.Core.ServerCallContext context) { Console.WriteLine(string.Format("SERVER - Subscription from client with id: {0}", request.ClientId)); EventManager.Instance.AddSubscriptionList(Combine(new string[] { context.Peer, request.ClientId }), request.EventTypes.ToList()); return Task.FromResult(new SubscriptionResponse { Successful = true }); } /// <summary> /// RPC for unsubscibing from events /// </summary> /// <param name="request">UnsubscribeRequest</param> /// <param name="context">gRPC client context</param> /// <returns></returns> public override Task<SubscriptionResponse> UnsubscribeFromEvents(UnsubscribeRequest request, Grpc.Core.ServerCallContext context) { EventType[] eventTypes = null; request.EventTypes.CopyTo(eventTypes, 0); EventManager.Instance.RemoveSubscriptionList(Combine(new string[] { context.Peer, request.ClientId }), eventTypes.ToList()); return Task.FromResult(new SubscriptionResponse { Successful = true }); } /// <summary> /// RPC for unsubscribing from all events /// </summary> /// <param name="request">UnsubscribeAllRequest</param> /// <param name="context">gRPC client context</param> /// <returns></returns> public override Task<SubscriptionResponse> UnsubscribeFromAll(UnsubscribeAllRequest request, Grpc.Core.ServerCallContext context) { EventManager.Instance.RemoveAllSubscriptions(Combine(new string[] { context.Peer, request.ClientId })); return Task.FromResult(new SubscriptionResponse { Successful = true }); } private string Combine(string[] args) { string outString = ""; foreach (string arg in args) { if (arg == args.Last()) { outString += arg; } else { outString += arg + ":"; } } return outString; } } }