From 00b39b1d84cd6e65b934fa27dd988fea5cb5bb5d Mon Sep 17 00:00:00 2001 From: watsonb8 Date: Mon, 15 Jul 2019 12:14:38 -0400 Subject: [PATCH] EventManager rewritten to support a push arch instead of an internal poll. Crash finally fixed!!! The dangers of threading. --- Aurora/Design/Views/Party/PartyViewModel.cs | 32 +- Aurora/RemoteImpl/RemoteEventImpl.cs | 35 +-- Aurora/RemoteImpl/RemotePartyImpl.cs | 11 +- Aurora/Services/EventManager.cs | 294 ------------------- Aurora/Services/EventManager/EventManager.cs | 208 +++++++++++++ Aurora/Services/ServerService.cs | 1 - 6 files changed, 254 insertions(+), 327 deletions(-) delete mode 100755 Aurora/Services/EventManager.cs create mode 100644 Aurora/Services/EventManager/EventManager.cs diff --git a/Aurora/Design/Views/Party/PartyViewModel.cs b/Aurora/Design/Views/Party/PartyViewModel.cs index 9751f82..54dd646 100644 --- a/Aurora/Design/Views/Party/PartyViewModel.cs +++ b/Aurora/Design/Views/Party/PartyViewModel.cs @@ -31,7 +31,6 @@ namespace Aurora.Design.Views.Party private RemotePartyService.RemotePartyServiceClient _remotePartyClient; private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient; private RemoteEventService.RemoteEventServiceClient _remoteEventsClient; - private Task _eventsTask; CancellationTokenSource _eventCancellationTokenSource; @@ -129,7 +128,6 @@ namespace Aurora.Design.Views.Party _remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel); //Assign but don't start task - _eventsTask = new Task(GetEvents); _eventCancellationTokenSource = new CancellationTokenSource(); } @@ -159,6 +157,7 @@ namespace Aurora.Design.Views.Party try { + Console.WriteLine(string.Format("CLIENT {0} - SubscribeToEvents called from client with id", SettingsService.Instance.ClientId)); _remoteEventsClient.SubscribeToEvents(req); } catch (Exception ex) @@ -166,8 +165,7 @@ namespace Aurora.Design.Views.Party Console.WriteLine("Error subscribing to events: " + ex.Message); } - - _eventsTask.Start(); + GetEvents(); } @@ -200,22 +198,31 @@ namespace Aurora.Design.Views.Party OnPropertyChanged("IsNotSelectingHost"); } + private void AddMember(PartyMember member) + { + Members.Add(member); + } + + /// /// Asynchronous function for processing events off of the event stream. /// /// private async void GetEvents() { + Console.WriteLine(string.Format("CLIENT {0} - GetEvents called from client with id", SettingsService.Instance.ClientId)); using (AsyncServerStreamingCall eventStream = _remoteEventsClient .GetEvents(new EventsRequest { ClientId = SettingsService.Instance.ClientId })) { while (!_eventCancellationTokenSource.Token.IsCancellationRequested && await eventStream.ResponseStream.MoveNext(_eventCancellationTokenSource.Token)) { + Console.WriteLine(string.Format("CLIENT {0} - Event received for client with id", SettingsService.Instance.ClientId)); try { //Convert derived event type - BaseEvent e = eventStream.ResponseStream.Current; + BaseEvent e = new BaseEvent(eventStream.ResponseStream.Current); + switch (e.DerivedEventCase) { case BaseEvent.DerivedEventOneofCase.None: @@ -225,7 +232,16 @@ namespace Aurora.Design.Views.Party case BaseEvent.DerivedEventOneofCase.PartyMemberJoinedEvent: { PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent; - Members.Add(derivedEvent.Member); + PartyMember member = new PartyMember + { + UserName = derivedEvent.Member.UserName, + Id = derivedEvent.Member.Id, + IpAddress = derivedEvent.Member.IpAddress, + Port = derivedEvent.Member.Port + }; + + AddMember(member); + break; } case BaseEvent.DerivedEventOneofCase.PartyMemberLeftEvent: @@ -234,7 +250,7 @@ namespace Aurora.Design.Views.Party var found = Members.Where(x => x.Id == derivedEvent.Member.Id); foreach (PartyMember member in found) { - Members.Remove(member); + _members.Remove(member); } break; } @@ -245,6 +261,8 @@ namespace Aurora.Design.Views.Party { Console.WriteLine(string.Format("EXCEPTION --- " + ex.Message)); } + + OnPropertyChanged("Members"); } } } diff --git a/Aurora/RemoteImpl/RemoteEventImpl.cs b/Aurora/RemoteImpl/RemoteEventImpl.cs index af5df1b..7183c33 100644 --- a/Aurora/RemoteImpl/RemoteEventImpl.cs +++ b/Aurora/RemoteImpl/RemoteEventImpl.cs @@ -1,10 +1,11 @@ using System; using System.Threading.Tasks; +using System.Threading; using System.Collections.Generic; -using System.Linq; using Aurora.Services; +using System.Linq; +using Aurora.Services.EventManager; using Aurora.Proto.Events; -using Aurora.Proto.General; namespace Aurora.RemoteImpl { @@ -24,24 +25,19 @@ namespace Aurora.RemoteImpl /// public async override Task GetEvents(EventsRequest request, Grpc.Core.IServerStreamWriter responseStream, Grpc.Core.ServerCallContext context) { - try + string peerId = Combine(new string[] { context.Peer, request.ClientId }); + Console.WriteLine(string.Format("SERVER - Events request received from peer: {0}", peerId)); + + AutoResetEvent are = new AutoResetEvent(false); + Action callback = (BaseEvent bEvent) => { - while (EventManager.Instance.GetSubscriptionCount(Combine(new string[] { context.Peer, request.ClientId })) > 0) - { - Console.WriteLine("Peer " + context.Peer); - //TODO this causes crashes when two or more members are connected - //TODO Change this to events based stream instead of a poll based... - List events = EventManager.Instance.GetSessionEvents(Combine(new string[] { context.Peer, request.ClientId })); - foreach (BaseEvent currentEvent in events) - { - await responseStream.WriteAsync(currentEvent); - } - } - } - catch (Exception ex) - { - Console.WriteLine("Exception caught:" + ex.Message); - } + Console.WriteLine(string.Format("SERVER - Event fired for peer: {0}", peerId)); + responseStream.WriteAsync(bEvent); + + }; + + EventManager.Instance.AddEventHandler(callback, Combine(new string[] { context.Peer, request.ClientId })); + are.WaitOne(); } /// @@ -52,6 +48,7 @@ namespace Aurora.RemoteImpl /// public override Task SubscribeToEvents(SubscribeRequest request, Grpc.Core.ServerCallContext context) { + Console.WriteLine(string.Format("SERVER - Subscription from client with id: {0}", request.ClientId)); EventManager.Instance.AddSubscriptionList(Combine(new string[] { context.Peer, request.ClientId }), request.EventTypes.ToList()); return Task.FromResult(new SubscriptionResponse { Successful = true }); diff --git a/Aurora/RemoteImpl/RemotePartyImpl.cs b/Aurora/RemoteImpl/RemotePartyImpl.cs index 08987ea..441bcec 100644 --- a/Aurora/RemoteImpl/RemotePartyImpl.cs +++ b/Aurora/RemoteImpl/RemotePartyImpl.cs @@ -2,12 +2,9 @@ using System; using System.Threading.Tasks; using System.Collections.ObjectModel; using System.Linq; -using Google.Protobuf.WellKnownTypes; using Aurora.Proto.Party; -using Aurora.Proto.General; using Aurora.Proto.Events; -using Aurora.Models; -using Aurora.Services; +using Aurora.Services.EventManager; namespace Aurora.RemoteImpl { @@ -39,6 +36,8 @@ namespace Aurora.RemoteImpl IpAddress = context.Host, }; + Console.WriteLine("SERVER - Client joined party: " + partyMember.Id); + _partyMembers.Add(partyMember); BaseEvent e = new BaseEvent @@ -50,7 +49,7 @@ namespace Aurora.RemoteImpl } }; - EventManager.Instance.PushEvent(e); + EventManager.Instance.FireEvent(e); JoinPartyResponse response = new JoinPartyResponse() { Status = PartyJoinedStatusEnum.Connected, ClientId = partyMember.Id }; return Task.FromResult(response); @@ -71,7 +70,7 @@ namespace Aurora.RemoteImpl } }; - EventManager.Instance.PushEvent(bv); + EventManager.Instance.FireEvent(bv); LeavePartyResponse response = new LeavePartyResponse() { Status = PartyJoinedStatusEnum.Disconnected }; return Task.FromResult(response); diff --git a/Aurora/Services/EventManager.cs b/Aurora/Services/EventManager.cs deleted file mode 100755 index 2c179df..0000000 --- a/Aurora/Services/EventManager.cs +++ /dev/null @@ -1,294 +0,0 @@ -using System; -using System.IO; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.Collections.Specialized; -using System.Linq; -using System.Threading; -using Google.Protobuf.WellKnownTypes; -using Google.Protobuf.Reflection; -using Aurora.Proto.Events; -using Aurora.Models; - -namespace Aurora.Services -{ - public class EventManager : BaseService - { - //TODO purge inactive sessions from the list - public EventManager() - { - _eventQueues = new ConcurrentDictionary>(); - _subscriptionList = new Dictionary>(); - _unprocessedEventsList = new ObservableCollection(); - _unprocessedEventsList.CollectionChanged += UnprocessedEventsList_Changed; - } - - #region Fields - private ConcurrentDictionary> _eventQueues; - private Dictionary> _subscriptionList; - private ObservableCollection _unprocessedEventsList; - - - #endregion Fields - - #region Private Methods - /// - /// Add event to appropriate events list for subscribed sessions - /// - /// - private void ProcessEvent(BaseEvent unprocessedEvent) - { - lock (_subscriptionList) - { - lock (_unprocessedEventsList) - { - //Duplicate events into client event queues - foreach (KeyValuePair> subscription in _subscriptionList) - { - //Active Events contains the session in question - if (_eventQueues.ContainsKey(subscription.Key)) - { - //Add all of the events to active events that are subscribed - - if (subscription.Value.Contains(unprocessedEvent.EventType)) - { - BlockingCollection eventList; - _eventQueues.TryGetValue(subscription.Key, out eventList); - if (eventList != null) - { - eventList.Add(unprocessedEvent); - } - } - } - } - - //Remove Event from list - _unprocessedEventsList.Remove(unprocessedEvent); - } - } - } - - #endregion Private Methods - - #region Public Methods - /// - /// Get the list of event type subscriptions for a given session id. - /// - /// Session Id - /// - public List GetSubscriptionList(string session) - { - List eventList = new List(); - if (_subscriptionList.ContainsKey(session)) - { - _subscriptionList.TryGetValue(session, out eventList); - } - - return eventList; - } - - /// - /// Get the number of event subscriptions for a given session - /// - /// Session Id - /// - public int GetSubscriptionCount(string session) - { - List eventList = new List(); - if (_subscriptionList.ContainsKey(session)) - { - _subscriptionList.TryGetValue(session, out eventList); - } - - return eventList.Count(); - } - - /// - /// Add a new subscription - /// - /// - /// - public bool AddSubscription(string session, EventType type) - { - bool success = false; - if (!_subscriptionList.ContainsKey(session)) - { - //Add session to subscription list - List eventList = new List(); - eventList.Add(type); - _subscriptionList.Add(session, eventList); - success = true; - //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); - } - else - { - List eventList; - _subscriptionList.TryGetValue(session, out eventList); - if (eventList != null) - { - eventList.Add(type); - success = true; - //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); - } - } - - //Add activeEvents if it doesn't exist - if (!_eventQueues.ContainsKey(session)) - { - //Add session to active events - _eventQueues.TryAdd(session, new BlockingCollection()); - } - - return success; - } - - /// - /// Add a list of subscriptions. This unsubscribes from unused events. - /// - /// The browser session id. - /// The list of event types to subscribe to. - public void AddSubscriptionList(string session, List types) - { - RemoveAllSubscriptions(session); - - foreach (EventType e in types) - { - AddSubscription(session, e); - } - } - - /// - /// Unsubscribe from a given event type. - /// - /// Session Id - /// Event Type to be removed - public void RemoveSubscription(string session, EventType type) - { - if (_subscriptionList.ContainsKey(session)) - { - List eventTypeList; - _subscriptionList.TryGetValue(session, out eventTypeList); - if (eventTypeList != null && eventTypeList.Contains(type)) - { - eventTypeList.Remove(type); - //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); - } - } - } - - public void RemoveSubscriptionList(string session, List types) - { - foreach (EventType e in types) - { - RemoveSubscription(session, e); - } - } - - /// - /// Remove all subscriptons for a given session. - /// - /// Session Id - public void RemoveAllSubscriptions(string session) - { - if (_subscriptionList.ContainsKey(session)) - { - _subscriptionList.Remove(session); - } - if (_eventQueues.ContainsKey(session)) - { - BlockingCollection rem = null; - _eventQueues.TryRemove(session, out rem); - } - - //base.LogInformation(string.Format("All subscriptions removed for event type {0}", session)); - } - - /// - /// Get a list of accumulated events for a given session. Timeout after 5 seconds if list is empty. - /// - /// - /// List - public List GetSessionEvents(string session) - { - BlockingCollection eList; - _eventQueues.TryGetValue(session, out eList); - - List returnList = new List(); - if (eList == null) - { - return returnList; - } - - //Continue to take until the eList is not empty. - while (true) - { - BaseEvent e; - eList.TryTake(out e); - if (e != null) - { - returnList.Add(e); - } - else - { - break; - } - } - - //In the event that eList was empty to begin with, wait for something to be appear or cancel - if (eList.Count == 0) - { - CancellationTokenSource tkSrc = new CancellationTokenSource(5000); - - BaseEvent e = null; - try - { - e = eList.Take(tkSrc.Token); - } - catch (OperationCanceledException) - { - // eat this - } - catch (Exception) - { - } - - if (e != null) - { - returnList.Add(e); - } - } - - return returnList; - } - - /// - /// Push a new event to the event queue. - /// - /// The event to be pushed - public void PushEvent(BaseEvent newEvent) - { - _unprocessedEventsList.Add(newEvent); - } - - #endregion Public Methods - - #region Events - private void UnprocessedEventsList_Changed(object sender, NotifyCollectionChangedEventArgs e) - { - switch (e.Action) - { - case NotifyCollectionChangedAction.Add: - { - foreach (BaseEvent bEvent in e.NewItems) - { - ProcessEvent(bEvent); - } - break; - } - } - } - - #endregion Events - } -} \ No newline at end of file diff --git a/Aurora/Services/EventManager/EventManager.cs b/Aurora/Services/EventManager/EventManager.cs new file mode 100644 index 0000000..5207817 --- /dev/null +++ b/Aurora/Services/EventManager/EventManager.cs @@ -0,0 +1,208 @@ +using System; +using System.IO; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Collections.Specialized; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf.WellKnownTypes; +using Google.Protobuf.Reflection; +using Aurora.Proto.Events; +using Aurora.Models; + +namespace Aurora.Services.EventManager +{ + public class EventManager : BaseService + { + #region Fields + private Dictionary> _subscriptionList; + private Dictionary> _actionList; + + #endregion Fields + public EventManager() + { + _subscriptionList = new Dictionary>(); + _actionList = new Dictionary>(); + } + + #region Private Methods + + + #endregion Private Methods + + #region Public Methods + /// + /// Get the list of event type subscriptions for a given session id. + /// + /// Session Id + /// + public List GetSubscriptionList(string session) + { + List eventList = new List(); + if (_subscriptionList.ContainsKey(session)) + { + _subscriptionList.TryGetValue(session, out eventList); + } + + return eventList; + } + + /// + /// Get the number of event subscriptions for a given session + /// + /// Session Id + /// + public int GetSubscriptionCount(string session) + { + List eventList = new List(); + if (_subscriptionList.ContainsKey(session)) + { + _subscriptionList.TryGetValue(session, out eventList); + } + + return eventList.Count(); + } + + /// + /// Add a new subscription + /// + /// + /// + public bool AddSubscription(string session, EventType type) + { + bool success = false; + lock (_subscriptionList) + { + if (!_subscriptionList.ContainsKey(session)) + { + //Add session to subscription list + List eventList = new List(); + eventList.Add(type); + _subscriptionList.Add(session, eventList); + success = true; + } + else + { + _subscriptionList.TryGetValue(session, out List eventList); + if (eventList != null) + { + eventList.Add(type); + success = true; + } + } + } + + return success; + } + + /// + /// Add a list of subscriptions. This unsubscribes from unused events. + /// + /// The browser session id. + /// The list of event types to subscribe to. + public void AddSubscriptionList(string session, List types) + { + RemoveAllSubscriptions(session); + + foreach (EventType e in types) + { + AddSubscription(session, e); + } + } + + /// + /// Unsubscribe from a given event type. + /// + /// Session Id + /// Event Type to be removed + public void RemoveSubscription(string session, EventType type) + { + lock (_subscriptionList) + { + if (_subscriptionList.ContainsKey(session)) + { + List eventTypeList; + _subscriptionList.TryGetValue(session, out eventTypeList); + if (eventTypeList != null && eventTypeList.Contains(type)) + { + eventTypeList.Remove(type); + //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); + } + } + } + } + + public void RemoveSubscriptionList(string session, List types) + { + foreach (EventType e in types) + { + RemoveSubscription(session, e); + } + } + + /// + /// Remove all subscriptons for a given session. + /// + /// Session Id + public void RemoveAllSubscriptions(string session) + { + if (_subscriptionList.ContainsKey(session)) + { + _subscriptionList.Remove(session); + } + } + + public void AddEventHandler(Action action, string sessionId) + { + lock (_actionList) + { + _actionList.Add(sessionId, action); + } + } + + public void RemoveEventHandler(string sessionId) + { + _actionList.Remove(sessionId); + } + + public void FireEvent(BaseEvent bEvent) + { + Dictionary> actionsCopy = new Dictionary>(); + //Copy actions list + lock (_actionList) + { + foreach (KeyValuePair> pair in _actionList) + { + actionsCopy.Add(pair.Key, pair.Value); + } + } + + lock (_subscriptionList) + { + foreach (KeyValuePair> pair in _subscriptionList) + { + Console.WriteLine("SERVER - Invoking action for client: " + pair.Key); + + Task.Delay(1000); + //If action list contains an action for id, invoke + if (actionsCopy.ContainsKey(pair.Key)) + { + actionsCopy.TryGetValue(pair.Key, out Action action); + + ParameterizedThreadStart operation = new ParameterizedThreadStart(obj => action((BaseEvent)obj)); + Thread executionThread = new Thread(operation, 1024 * 1024); + + executionThread.Start(bEvent); + executionThread.Join(); + } + } + } + + } + + #endregion Public Methods + + } +} \ No newline at end of file diff --git a/Aurora/Services/ServerService.cs b/Aurora/Services/ServerService.cs index ba51764..3862600 100644 --- a/Aurora/Services/ServerService.cs +++ b/Aurora/Services/ServerService.cs @@ -86,7 +86,6 @@ namespace Aurora.Services RegisterService(RemotePlaybackService.BindService(_remotePlaybackImpl)); RegisterService(RemoteEventService.BindService(_remoteEventImpl)); } - _server.Start(); } catch (Exception ex)