diff --git a/Aurora/Aurora.csproj b/Aurora/Aurora.csproj index a6be19b..b46e96e 100644 --- a/Aurora/Aurora.csproj +++ b/Aurora/Aurora.csproj @@ -93,5 +93,7 @@ Include="Proto\party.proto"/> + \ No newline at end of file diff --git a/Aurora/Design/Components/MemberList/MemberList.xaml.cs b/Aurora/Design/Components/MemberList/MemberList.xaml.cs index 3294e5b..aed88b8 100644 --- a/Aurora/Design/Components/MemberList/MemberList.xaml.cs +++ b/Aurora/Design/Components/MemberList/MemberList.xaml.cs @@ -2,7 +2,7 @@ using System.Collections.ObjectModel; using System.Collections.Specialized; using Xamarin.Forms; -using Aurora.Proto; +using Aurora.Proto.Party; namespace Aurora.Design.Components.MemberList { diff --git a/Aurora/Design/Views/Party/PartyViewModel.cs b/Aurora/Design/Views/Party/PartyViewModel.cs index e70f6e4..17cd9ce 100644 --- a/Aurora/Design/Views/Party/PartyViewModel.cs +++ b/Aurora/Design/Views/Party/PartyViewModel.cs @@ -1,7 +1,7 @@ using System; using System.Collections.ObjectModel; using Aurora.Executors; -using Aurora.Proto; +using Aurora.Proto.Party; using Xamarin.Forms; namespace Aurora.Design.Views.Party @@ -32,6 +32,15 @@ namespace Aurora.Design.Views.Party State(PartyState.SelectingHost); } + + ~PartyViewModel() + { + if (_executor != null) + { + _executor.Close(); + } + } + #region Properties public ObservableCollection Members diff --git a/Aurora/Executors/BaseExecutor.cs b/Aurora/Executors/BaseExecutor.cs index 6033039..f3ed4b8 100644 --- a/Aurora/Executors/BaseExecutor.cs +++ b/Aurora/Executors/BaseExecutor.cs @@ -2,8 +2,7 @@ using System; using System.Reflection; using System.Linq; using System.Collections.ObjectModel; -using Aurora.Proto; - +using Aurora.Proto.Party; namespace Aurora.Executors { public abstract class BaseExecutor diff --git a/Aurora/Executors/ClientExecutor.cs b/Aurora/Executors/ClientExecutor.cs index 60ce8e6..6f8aeff 100644 --- a/Aurora/Executors/ClientExecutor.cs +++ b/Aurora/Executors/ClientExecutor.cs @@ -1,8 +1,13 @@ using System; using System.Collections.ObjectModel; +using System.Threading.Tasks; +using System.Threading; using Grpc.Core; -using Aurora.Proto; -using Aurora.Models; +using wellKnown = Google.Protobuf.WellKnownTypes; +using Aurora.Proto.General; +using Aurora.Proto.Party; +using Aurora.Proto.Playback; +using Aurora.Proto.Events; using Aurora.Services; namespace Aurora.Executors { @@ -11,6 +16,8 @@ namespace Aurora.Executors private Channel _channel; private RemotePartyService.RemotePartyServiceClient _remotePartyClient; private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient; + private RemoteEventService.RemoteEventServiceClient _remoteEventsClient; + private Task _eventsTask; private ObservableCollection _partyMembers; @@ -27,16 +34,28 @@ namespace Aurora.Executors #endregion Properties + /// + /// Initiates the connection to a party. + /// + /// The hostname of the gRPC server public override void Connect(string hostname) { _channel = new Channel(string.Format("{0}:{1}", hostname, SettingsService.Instance.DefaultPort), ChannelCredentials.Insecure); _remotePartyClient = new RemotePartyService.RemotePartyServiceClient(_channel); _remotePlaybackClient = new RemotePlaybackService.RemotePlaybackServiceClient(_channel); + _remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel); + + //Assign but don't start task + _eventsTask = new Task(GetEvents); JoinParty(); } + /// + /// Shutdown Connections + /// + /// public override async void Close() { await _channel.ShutdownAsync(); @@ -69,6 +88,28 @@ namespace Aurora.Executors UserName = SettingsService.Instance.Username, }); + RefreshMembers(); + + //Subscribe to events + SubscribeRequest req = new SubscribeRequest(); + req.EventTypes.Add(EventType.PartyMemberJoined); + req.EventTypes.Add(EventType.PartyMemberLeft); + try + { + _remoteEventsClient.SubscribeToEvents(req); + } + catch (Exception ex) + { + Console.WriteLine("Error subscribing to events: " + ex.Message); + } + + + _eventsTask.Start(); + + } + + private void RefreshMembers() + { MembersResponse resposne = _remotePartyClient.GetPartyMembers(new Empty()); //Add members foreach (PartyMember member in resposne.Members) @@ -85,5 +126,36 @@ namespace Aurora.Executors } } } + + private async void GetEvents() + { + + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + using (AsyncServerStreamingCall eventStream = _remoteEventsClient.GetEvents(new Empty())) + { + while (await eventStream.ResponseStream.MoveNext(cancellationTokenSource.Token)) + { + //Convert derived event type + BaseEvent e = eventStream.ResponseStream.Current; + switch (e.DerivedEventCase) + { + case BaseEvent.DerivedEventOneofCase.None: + { + throw new InvalidOperationException(); + } + case BaseEvent.DerivedEventOneofCase.PartyMemberJoinedEvent: + { + PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent; + break; + } + case BaseEvent.DerivedEventOneofCase.PartyMemberLeftEvent: + { + PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent; + break; + } + } + } + } + } } } \ No newline at end of file diff --git a/Aurora/Executors/HostExecutor.cs b/Aurora/Executors/HostExecutor.cs index 001d609..e95370f 100644 --- a/Aurora/Executors/HostExecutor.cs +++ b/Aurora/Executors/HostExecutor.cs @@ -4,7 +4,9 @@ using System.Collections.ObjectModel; using Aurora.Models; using Aurora.Executors; using Aurora.Services; -using Aurora.Proto; +using Aurora.Proto.Party; +using Aurora.Proto.Playback; +using Aurora.Proto.Events; using Aurora.RemoteImpl; namespace Aurora.Executors @@ -13,10 +15,12 @@ namespace Aurora.Executors { RemotePartyServiceImpl _remotePartyServiceImpl; RemotePlaybackServiceImpl _remotePlaybackImpl; + RemoteEventServiceImpl _remoteEventImpl; public HostExecutor() { _remotePartyServiceImpl = new RemotePartyServiceImpl(); _remotePlaybackImpl = new RemotePlaybackServiceImpl(); + _remoteEventImpl = new RemoteEventServiceImpl(); } public override void Connect(string hostname) @@ -27,6 +31,7 @@ namespace Aurora.Executors //Register grpc RemoteService with singleton server service ServerService.Instance.RegisterService(RemotePartyService.BindService(_remotePartyServiceImpl)); ServerService.Instance.RegisterService(RemotePlaybackService.BindService(_remotePlaybackImpl)); + ServerService.Instance.RegisterService(RemoteEventService.BindService(_remoteEventImpl)); //start gRPC server ServerService.Instance.Start(); diff --git a/Aurora/Proto/events.proto b/Aurora/Proto/events.proto new file mode 100644 index 0000000..461d94d --- /dev/null +++ b/Aurora/Proto/events.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +package Aurora.Proto.Events; + +import "Proto/general.proto"; +import "Proto/party.proto"; + +service RemoteEventService { + //Party Service + rpc GetEvents(Aurora.Proto.General.Empty) returns (stream BaseEvent) {}; + rpc SubscribeToEvents(SubscribeRequest) returns(SubscriptionResponse); + rpc UnsubscribeFromEvents(UnsubscribeRequest) returns (SubscriptionResponse); + rpc UnsubscribeFromAll(UnsubscribeAllRequest) returns (SubscriptionResponse); +} + +/* Subscription messages */ +message SubscribeRequest { + repeated EventType eventTypes = 1; +} + +message UnsubscribeRequest { + repeated EventType eventTypes = 1; +} + +message UnsubscribeAllRequest { +} + +message SubscriptionResponse { + bool successful = 1; +} + +/* Event Types */ +enum EventType { + PartyMemberJoined = 0; + PartyMemberLeft = 1; +} +message BaseEvent { + EventType eventType = 1; + oneof derivedEvent { + PartyMemberJoinedEvent partyMemberJoinedEvent = 2; + PartyMemberLeftEvent partyMemberLeftEvent = 3; + } +} + +message PartyMemberJoinedEvent { + Aurora.Proto.Party.PartyMember member = 2; +} + +message PartyMemberLeftEvent { + Aurora.Proto.Party.PartyMember member = 2; +} \ No newline at end of file diff --git a/Aurora/Proto/general.proto b/Aurora/Proto/general.proto index 77b62ed..684e2f3 100644 --- a/Aurora/Proto/general.proto +++ b/Aurora/Proto/general.proto @@ -1,10 +1,10 @@ syntax = "proto3"; -package Aurora.Proto; +package Aurora.Proto.General; message Chunk { bytes Content = 1; } message Empty{ -} \ No newline at end of file +} diff --git a/Aurora/Proto/party.proto b/Aurora/Proto/party.proto index 625ab14..5024a3e 100644 --- a/Aurora/Proto/party.proto +++ b/Aurora/Proto/party.proto @@ -1,6 +1,6 @@ syntax = "proto3"; -package Aurora.Proto; +package Aurora.Proto.Party; import "Proto/general.proto"; @@ -8,7 +8,7 @@ service RemotePartyService { //Party Service rpc JoinParty(JoinPartyRequest) returns (JoinPartyResponse); rpc LeaveParty(LeavePartyRequest) returns (LeavePartyResponse); - rpc GetPartyMembers(Empty) returns (MembersResponse); + rpc GetPartyMembers(Aurora.Proto.General.Empty) returns (MembersResponse); } message JoinPartyRequest { diff --git a/Aurora/Proto/playback.proto b/Aurora/Proto/playback.proto index 22b741f..64dad2e 100644 --- a/Aurora/Proto/playback.proto +++ b/Aurora/Proto/playback.proto @@ -1,12 +1,12 @@ syntax = "proto3"; -package Aurora.Proto; +package Aurora.Proto.Playback; import "Proto/general.proto"; service RemotePlaybackService { //Playback Service - rpc GetPartyStream(Empty) returns (stream Chunk) {}; + rpc GetPartyStream(Aurora.Proto.General.Empty) returns (stream Aurora.Proto.General.Chunk) {}; } enum TransferStatusCode { diff --git a/Aurora/RemoteImpl/RemoteEventImpl.cs b/Aurora/RemoteImpl/RemoteEventImpl.cs new file mode 100644 index 0000000..2feb619 --- /dev/null +++ b/Aurora/RemoteImpl/RemoteEventImpl.cs @@ -0,0 +1,80 @@ +using System; +using System.Threading.Tasks; +using System.Collections.Generic; +using System.Linq; +using Aurora.Services; +using Aurora.Proto.Events; +using Aurora.Proto.General; + +namespace Aurora.RemoteImpl +{ + public class RemoteEventServiceImpl : RemoteEventService.RemoteEventServiceBase + { + public RemoteEventServiceImpl() + { + + } + + /// + /// RPC for getting event stream for a particular client. + /// + /// Empty + /// The response stream + /// gRPC client context + /// + public async override Task GetEvents(Empty request, Grpc.Core.IServerStreamWriter responseStream, Grpc.Core.ServerCallContext context) + { + while (EventManager.Instance.GetSubscriptionCount(context.Peer) > 0) + { + List events = EventManager.Instance.GetSessionEvents(context.Peer); + foreach (BaseEvent currentEvent in events) + { + await responseStream.WriteAsync(currentEvent); + } + } + } + + /// + /// RPC for subscribing to remote events + /// + /// SubscribeRequest + /// gRPC client context + /// + public override Task SubscribeToEvents(SubscribeRequest request, Grpc.Core.ServerCallContext context) + { + EventManager.Instance.AddSubscriptionList(context.Peer, request.EventTypes.ToList()); + + return Task.FromResult(new SubscriptionResponse { Successful = true }); + } + + /// + /// RPC for unsubscibing from events + /// + /// UnsubscribeRequest + /// gRPC client context + /// + public override Task UnsubscribeFromEvents(UnsubscribeRequest request, Grpc.Core.ServerCallContext context) + { + EventType[] eventTypes = null; + request.EventTypes.CopyTo(eventTypes, 0); + + EventManager.Instance.RemoveSubscriptionList(context.Peer, eventTypes.ToList()); + + return Task.FromResult(new SubscriptionResponse { Successful = true }); + } + + /// + /// RPC for unsubscribing from all events + /// + /// UnsubscribeAllRequest + /// gRPC client context + /// + public override Task UnsubscribeFromAll(UnsubscribeAllRequest request, Grpc.Core.ServerCallContext context) + { + EventManager.Instance.RemoveAllSubscriptions(context.Peer); + + return Task.FromResult(new SubscriptionResponse { Successful = true }); + } + } + +} \ No newline at end of file diff --git a/Aurora/RemoteImpl/RemotePartyImpl.cs b/Aurora/RemoteImpl/RemotePartyImpl.cs index 34bb207..d58e792 100644 --- a/Aurora/RemoteImpl/RemotePartyImpl.cs +++ b/Aurora/RemoteImpl/RemotePartyImpl.cs @@ -2,7 +2,10 @@ using System; using System.Threading.Tasks; using System.Collections.ObjectModel; using System.Linq; -using Aurora.Proto; +using Google.Protobuf.WellKnownTypes; +using Aurora.Proto.Party; +using Aurora.Proto.General; +using Aurora.Proto.Events; using Aurora.Models; using Aurora.Services; @@ -39,11 +42,24 @@ namespace Aurora.RemoteImpl public override Task JoinParty(JoinPartyRequest request, Grpc.Core.ServerCallContext context) { - _partyMembers.Add(new PartyMember() + PartyMember partyMember = new PartyMember() { UserName = request.UserName, IpAddress = context.Host, - }); + }; + + _partyMembers.Add(partyMember); + + BaseEvent e = new BaseEvent + { + EventType = EventType.PartyMemberJoined, + PartyMemberJoinedEvent = new PartyMemberJoinedEvent + { + Member = partyMember, + } + }; + + EventManager.Instance.PushEvent(e); JoinPartyResponse response = new JoinPartyResponse() { Status = PartyJoinedStatusEnum.Connected }; return Task.FromResult(response); @@ -51,12 +67,26 @@ namespace Aurora.RemoteImpl public override Task LeaveParty(LeavePartyRequest request, Grpc.Core.ServerCallContext context) { - _partyMembers.Remove(_partyMembers.Where(e => e.Id == request.ClientId).Single()); + PartyMember partyMember = _partyMembers.Where(e => e.Id == request.ClientId).Single(); + + _partyMembers.Remove(partyMember); + + BaseEvent bv = new BaseEvent + { + EventType = EventType.PartyMemberJoined, + PartyMemberLeftEvent = new PartyMemberLeftEvent + { + Member = partyMember, + } + }; + + EventManager.Instance.PushEvent(bv); + LeavePartyResponse response = new LeavePartyResponse() { Status = PartyJoinedStatusEnum.Disconnected }; return Task.FromResult(response); } - public override Task GetPartyMembers(Empty empty, Grpc.Core.ServerCallContext context) + public override Task GetPartyMembers(Proto.General.Empty empty, Grpc.Core.ServerCallContext context) { MembersResponse response = new MembersResponse(); response.Members.AddRange(_partyMembers); diff --git a/Aurora/RemoteImpl/RemotePlaybackImpl.cs b/Aurora/RemoteImpl/RemotePlaybackImpl.cs index 8470eb1..47b8d12 100644 --- a/Aurora/RemoteImpl/RemotePlaybackImpl.cs +++ b/Aurora/RemoteImpl/RemotePlaybackImpl.cs @@ -1,7 +1,8 @@ using System; using System.Threading.Tasks; using System.IO; -using Aurora.Proto; +using Aurora.Proto.Playback; +using Aurora.Proto.General; using Aurora.Models; namespace Aurora.RemoteImpl @@ -14,7 +15,7 @@ namespace Aurora.RemoteImpl } - public override async Task GetPartyStream(Empty empty, + public override Task GetPartyStream(Empty empty, Grpc.Core.IServerStreamWriter responseStream, Grpc.Core.ServerCallContext context) { diff --git a/Aurora/Services/EventManager.cs b/Aurora/Services/EventManager.cs new file mode 100755 index 0000000..2c179df --- /dev/null +++ b/Aurora/Services/EventManager.cs @@ -0,0 +1,294 @@ +using System; +using System.IO; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Collections.Specialized; +using System.Linq; +using System.Threading; +using Google.Protobuf.WellKnownTypes; +using Google.Protobuf.Reflection; +using Aurora.Proto.Events; +using Aurora.Models; + +namespace Aurora.Services +{ + public class EventManager : BaseService + { + //TODO purge inactive sessions from the list + public EventManager() + { + _eventQueues = new ConcurrentDictionary>(); + _subscriptionList = new Dictionary>(); + _unprocessedEventsList = new ObservableCollection(); + _unprocessedEventsList.CollectionChanged += UnprocessedEventsList_Changed; + } + + #region Fields + private ConcurrentDictionary> _eventQueues; + private Dictionary> _subscriptionList; + private ObservableCollection _unprocessedEventsList; + + + #endregion Fields + + #region Private Methods + /// + /// Add event to appropriate events list for subscribed sessions + /// + /// + private void ProcessEvent(BaseEvent unprocessedEvent) + { + lock (_subscriptionList) + { + lock (_unprocessedEventsList) + { + //Duplicate events into client event queues + foreach (KeyValuePair> subscription in _subscriptionList) + { + //Active Events contains the session in question + if (_eventQueues.ContainsKey(subscription.Key)) + { + //Add all of the events to active events that are subscribed + + if (subscription.Value.Contains(unprocessedEvent.EventType)) + { + BlockingCollection eventList; + _eventQueues.TryGetValue(subscription.Key, out eventList); + if (eventList != null) + { + eventList.Add(unprocessedEvent); + } + } + } + } + + //Remove Event from list + _unprocessedEventsList.Remove(unprocessedEvent); + } + } + } + + #endregion Private Methods + + #region Public Methods + /// + /// Get the list of event type subscriptions for a given session id. + /// + /// Session Id + /// + public List GetSubscriptionList(string session) + { + List eventList = new List(); + if (_subscriptionList.ContainsKey(session)) + { + _subscriptionList.TryGetValue(session, out eventList); + } + + return eventList; + } + + /// + /// Get the number of event subscriptions for a given session + /// + /// Session Id + /// + public int GetSubscriptionCount(string session) + { + List eventList = new List(); + if (_subscriptionList.ContainsKey(session)) + { + _subscriptionList.TryGetValue(session, out eventList); + } + + return eventList.Count(); + } + + /// + /// Add a new subscription + /// + /// + /// + public bool AddSubscription(string session, EventType type) + { + bool success = false; + if (!_subscriptionList.ContainsKey(session)) + { + //Add session to subscription list + List eventList = new List(); + eventList.Add(type); + _subscriptionList.Add(session, eventList); + success = true; + //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); + } + else + { + List eventList; + _subscriptionList.TryGetValue(session, out eventList); + if (eventList != null) + { + eventList.Add(type); + success = true; + //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); + } + } + + //Add activeEvents if it doesn't exist + if (!_eventQueues.ContainsKey(session)) + { + //Add session to active events + _eventQueues.TryAdd(session, new BlockingCollection()); + } + + return success; + } + + /// + /// Add a list of subscriptions. This unsubscribes from unused events. + /// + /// The browser session id. + /// The list of event types to subscribe to. + public void AddSubscriptionList(string session, List types) + { + RemoveAllSubscriptions(session); + + foreach (EventType e in types) + { + AddSubscription(session, e); + } + } + + /// + /// Unsubscribe from a given event type. + /// + /// Session Id + /// Event Type to be removed + public void RemoveSubscription(string session, EventType type) + { + if (_subscriptionList.ContainsKey(session)) + { + List eventTypeList; + _subscriptionList.TryGetValue(session, out eventTypeList); + if (eventTypeList != null && eventTypeList.Contains(type)) + { + eventTypeList.Remove(type); + //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); + } + } + } + + public void RemoveSubscriptionList(string session, List types) + { + foreach (EventType e in types) + { + RemoveSubscription(session, e); + } + } + + /// + /// Remove all subscriptons for a given session. + /// + /// Session Id + public void RemoveAllSubscriptions(string session) + { + if (_subscriptionList.ContainsKey(session)) + { + _subscriptionList.Remove(session); + } + if (_eventQueues.ContainsKey(session)) + { + BlockingCollection rem = null; + _eventQueues.TryRemove(session, out rem); + } + + //base.LogInformation(string.Format("All subscriptions removed for event type {0}", session)); + } + + /// + /// Get a list of accumulated events for a given session. Timeout after 5 seconds if list is empty. + /// + /// + /// List + public List GetSessionEvents(string session) + { + BlockingCollection eList; + _eventQueues.TryGetValue(session, out eList); + + List returnList = new List(); + if (eList == null) + { + return returnList; + } + + //Continue to take until the eList is not empty. + while (true) + { + BaseEvent e; + eList.TryTake(out e); + if (e != null) + { + returnList.Add(e); + } + else + { + break; + } + } + + //In the event that eList was empty to begin with, wait for something to be appear or cancel + if (eList.Count == 0) + { + CancellationTokenSource tkSrc = new CancellationTokenSource(5000); + + BaseEvent e = null; + try + { + e = eList.Take(tkSrc.Token); + } + catch (OperationCanceledException) + { + // eat this + } + catch (Exception) + { + } + + if (e != null) + { + returnList.Add(e); + } + } + + return returnList; + } + + /// + /// Push a new event to the event queue. + /// + /// The event to be pushed + public void PushEvent(BaseEvent newEvent) + { + _unprocessedEventsList.Add(newEvent); + } + + #endregion Public Methods + + #region Events + private void UnprocessedEventsList_Changed(object sender, NotifyCollectionChangedEventArgs e) + { + switch (e.Action) + { + case NotifyCollectionChangedAction.Add: + { + foreach (BaseEvent bEvent in e.NewItems) + { + ProcessEvent(bEvent); + } + break; + } + } + } + + #endregion Events + } +} \ No newline at end of file