diff --git a/Aurora/Design/Views/Party/PartyViewModel.cs b/Aurora/Design/Views/Party/PartyViewModel.cs
index 9751f82..54dd646 100644
--- a/Aurora/Design/Views/Party/PartyViewModel.cs
+++ b/Aurora/Design/Views/Party/PartyViewModel.cs
@@ -31,7 +31,6 @@ namespace Aurora.Design.Views.Party
private RemotePartyService.RemotePartyServiceClient _remotePartyClient;
private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient;
private RemoteEventService.RemoteEventServiceClient _remoteEventsClient;
- private Task _eventsTask;
CancellationTokenSource _eventCancellationTokenSource;
@@ -129,7 +128,6 @@ namespace Aurora.Design.Views.Party
_remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel);
//Assign but don't start task
- _eventsTask = new Task(GetEvents);
_eventCancellationTokenSource = new CancellationTokenSource();
}
@@ -159,6 +157,7 @@ namespace Aurora.Design.Views.Party
try
{
+ Console.WriteLine(string.Format("CLIENT {0} - SubscribeToEvents called from client with id", SettingsService.Instance.ClientId));
_remoteEventsClient.SubscribeToEvents(req);
}
catch (Exception ex)
@@ -166,8 +165,7 @@ namespace Aurora.Design.Views.Party
Console.WriteLine("Error subscribing to events: " + ex.Message);
}
-
- _eventsTask.Start();
+ GetEvents();
}
@@ -200,22 +198,31 @@ namespace Aurora.Design.Views.Party
OnPropertyChanged("IsNotSelectingHost");
}
+ private void AddMember(PartyMember member)
+ {
+ Members.Add(member);
+ }
+
+
///
/// Asynchronous function for processing events off of the event stream.
///
///
private async void GetEvents()
{
+ Console.WriteLine(string.Format("CLIENT {0} - GetEvents called from client with id", SettingsService.Instance.ClientId));
using (AsyncServerStreamingCall eventStream = _remoteEventsClient
.GetEvents(new EventsRequest { ClientId = SettingsService.Instance.ClientId }))
{
while (!_eventCancellationTokenSource.Token.IsCancellationRequested &&
await eventStream.ResponseStream.MoveNext(_eventCancellationTokenSource.Token))
{
+ Console.WriteLine(string.Format("CLIENT {0} - Event received for client with id", SettingsService.Instance.ClientId));
try
{
//Convert derived event type
- BaseEvent e = eventStream.ResponseStream.Current;
+ BaseEvent e = new BaseEvent(eventStream.ResponseStream.Current);
+
switch (e.DerivedEventCase)
{
case BaseEvent.DerivedEventOneofCase.None:
@@ -225,7 +232,16 @@ namespace Aurora.Design.Views.Party
case BaseEvent.DerivedEventOneofCase.PartyMemberJoinedEvent:
{
PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent;
- Members.Add(derivedEvent.Member);
+ PartyMember member = new PartyMember
+ {
+ UserName = derivedEvent.Member.UserName,
+ Id = derivedEvent.Member.Id,
+ IpAddress = derivedEvent.Member.IpAddress,
+ Port = derivedEvent.Member.Port
+ };
+
+ AddMember(member);
+
break;
}
case BaseEvent.DerivedEventOneofCase.PartyMemberLeftEvent:
@@ -234,7 +250,7 @@ namespace Aurora.Design.Views.Party
var found = Members.Where(x => x.Id == derivedEvent.Member.Id);
foreach (PartyMember member in found)
{
- Members.Remove(member);
+ _members.Remove(member);
}
break;
}
@@ -245,6 +261,8 @@ namespace Aurora.Design.Views.Party
{
Console.WriteLine(string.Format("EXCEPTION --- " + ex.Message));
}
+
+ OnPropertyChanged("Members");
}
}
}
diff --git a/Aurora/RemoteImpl/RemoteEventImpl.cs b/Aurora/RemoteImpl/RemoteEventImpl.cs
index af5df1b..7183c33 100644
--- a/Aurora/RemoteImpl/RemoteEventImpl.cs
+++ b/Aurora/RemoteImpl/RemoteEventImpl.cs
@@ -1,10 +1,11 @@
using System;
using System.Threading.Tasks;
+using System.Threading;
using System.Collections.Generic;
-using System.Linq;
using Aurora.Services;
+using System.Linq;
+using Aurora.Services.EventManager;
using Aurora.Proto.Events;
-using Aurora.Proto.General;
namespace Aurora.RemoteImpl
{
@@ -24,24 +25,19 @@ namespace Aurora.RemoteImpl
///
public async override Task GetEvents(EventsRequest request, Grpc.Core.IServerStreamWriter responseStream, Grpc.Core.ServerCallContext context)
{
- try
+ string peerId = Combine(new string[] { context.Peer, request.ClientId });
+ Console.WriteLine(string.Format("SERVER - Events request received from peer: {0}", peerId));
+
+ AutoResetEvent are = new AutoResetEvent(false);
+ Action callback = (BaseEvent bEvent) =>
{
- while (EventManager.Instance.GetSubscriptionCount(Combine(new string[] { context.Peer, request.ClientId })) > 0)
- {
- Console.WriteLine("Peer " + context.Peer);
- //TODO this causes crashes when two or more members are connected
- //TODO Change this to events based stream instead of a poll based...
- List events = EventManager.Instance.GetSessionEvents(Combine(new string[] { context.Peer, request.ClientId }));
- foreach (BaseEvent currentEvent in events)
- {
- await responseStream.WriteAsync(currentEvent);
- }
- }
- }
- catch (Exception ex)
- {
- Console.WriteLine("Exception caught:" + ex.Message);
- }
+ Console.WriteLine(string.Format("SERVER - Event fired for peer: {0}", peerId));
+ responseStream.WriteAsync(bEvent);
+
+ };
+
+ EventManager.Instance.AddEventHandler(callback, Combine(new string[] { context.Peer, request.ClientId }));
+ are.WaitOne();
}
///
@@ -52,6 +48,7 @@ namespace Aurora.RemoteImpl
///
public override Task SubscribeToEvents(SubscribeRequest request, Grpc.Core.ServerCallContext context)
{
+ Console.WriteLine(string.Format("SERVER - Subscription from client with id: {0}", request.ClientId));
EventManager.Instance.AddSubscriptionList(Combine(new string[] { context.Peer, request.ClientId }), request.EventTypes.ToList());
return Task.FromResult(new SubscriptionResponse { Successful = true });
diff --git a/Aurora/RemoteImpl/RemotePartyImpl.cs b/Aurora/RemoteImpl/RemotePartyImpl.cs
index 08987ea..441bcec 100644
--- a/Aurora/RemoteImpl/RemotePartyImpl.cs
+++ b/Aurora/RemoteImpl/RemotePartyImpl.cs
@@ -2,12 +2,9 @@ using System;
using System.Threading.Tasks;
using System.Collections.ObjectModel;
using System.Linq;
-using Google.Protobuf.WellKnownTypes;
using Aurora.Proto.Party;
-using Aurora.Proto.General;
using Aurora.Proto.Events;
-using Aurora.Models;
-using Aurora.Services;
+using Aurora.Services.EventManager;
namespace Aurora.RemoteImpl
{
@@ -39,6 +36,8 @@ namespace Aurora.RemoteImpl
IpAddress = context.Host,
};
+ Console.WriteLine("SERVER - Client joined party: " + partyMember.Id);
+
_partyMembers.Add(partyMember);
BaseEvent e = new BaseEvent
@@ -50,7 +49,7 @@ namespace Aurora.RemoteImpl
}
};
- EventManager.Instance.PushEvent(e);
+ EventManager.Instance.FireEvent(e);
JoinPartyResponse response = new JoinPartyResponse() { Status = PartyJoinedStatusEnum.Connected, ClientId = partyMember.Id };
return Task.FromResult(response);
@@ -71,7 +70,7 @@ namespace Aurora.RemoteImpl
}
};
- EventManager.Instance.PushEvent(bv);
+ EventManager.Instance.FireEvent(bv);
LeavePartyResponse response = new LeavePartyResponse() { Status = PartyJoinedStatusEnum.Disconnected };
return Task.FromResult(response);
diff --git a/Aurora/Services/EventManager.cs b/Aurora/Services/EventManager.cs
deleted file mode 100755
index 2c179df..0000000
--- a/Aurora/Services/EventManager.cs
+++ /dev/null
@@ -1,294 +0,0 @@
-using System;
-using System.IO;
-using System.Collections.Concurrent;
-using System.Collections.Generic;
-using System.Collections.ObjectModel;
-using System.Collections.Specialized;
-using System.Linq;
-using System.Threading;
-using Google.Protobuf.WellKnownTypes;
-using Google.Protobuf.Reflection;
-using Aurora.Proto.Events;
-using Aurora.Models;
-
-namespace Aurora.Services
-{
- public class EventManager : BaseService
- {
- //TODO purge inactive sessions from the list
- public EventManager()
- {
- _eventQueues = new ConcurrentDictionary>();
- _subscriptionList = new Dictionary>();
- _unprocessedEventsList = new ObservableCollection();
- _unprocessedEventsList.CollectionChanged += UnprocessedEventsList_Changed;
- }
-
- #region Fields
- private ConcurrentDictionary> _eventQueues;
- private Dictionary> _subscriptionList;
- private ObservableCollection _unprocessedEventsList;
-
-
- #endregion Fields
-
- #region Private Methods
- ///
- /// Add event to appropriate events list for subscribed sessions
- ///
- ///
- private void ProcessEvent(BaseEvent unprocessedEvent)
- {
- lock (_subscriptionList)
- {
- lock (_unprocessedEventsList)
- {
- //Duplicate events into client event queues
- foreach (KeyValuePair> subscription in _subscriptionList)
- {
- //Active Events contains the session in question
- if (_eventQueues.ContainsKey(subscription.Key))
- {
- //Add all of the events to active events that are subscribed
-
- if (subscription.Value.Contains(unprocessedEvent.EventType))
- {
- BlockingCollection eventList;
- _eventQueues.TryGetValue(subscription.Key, out eventList);
- if (eventList != null)
- {
- eventList.Add(unprocessedEvent);
- }
- }
- }
- }
-
- //Remove Event from list
- _unprocessedEventsList.Remove(unprocessedEvent);
- }
- }
- }
-
- #endregion Private Methods
-
- #region Public Methods
- ///
- /// Get the list of event type subscriptions for a given session id.
- ///
- /// Session Id
- ///
- public List GetSubscriptionList(string session)
- {
- List eventList = new List();
- if (_subscriptionList.ContainsKey(session))
- {
- _subscriptionList.TryGetValue(session, out eventList);
- }
-
- return eventList;
- }
-
- ///
- /// Get the number of event subscriptions for a given session
- ///
- /// Session Id
- ///
- public int GetSubscriptionCount(string session)
- {
- List eventList = new List();
- if (_subscriptionList.ContainsKey(session))
- {
- _subscriptionList.TryGetValue(session, out eventList);
- }
-
- return eventList.Count();
- }
-
- ///
- /// Add a new subscription
- ///
- ///
- ///
- public bool AddSubscription(string session, EventType type)
- {
- bool success = false;
- if (!_subscriptionList.ContainsKey(session))
- {
- //Add session to subscription list
- List eventList = new List();
- eventList.Add(type);
- _subscriptionList.Add(session, eventList);
- success = true;
- //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
- }
- else
- {
- List eventList;
- _subscriptionList.TryGetValue(session, out eventList);
- if (eventList != null)
- {
- eventList.Add(type);
- success = true;
- //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
- }
- }
-
- //Add activeEvents if it doesn't exist
- if (!_eventQueues.ContainsKey(session))
- {
- //Add session to active events
- _eventQueues.TryAdd(session, new BlockingCollection());
- }
-
- return success;
- }
-
- ///
- /// Add a list of subscriptions. This unsubscribes from unused events.
- ///
- /// The browser session id.
- /// The list of event types to subscribe to.
- public void AddSubscriptionList(string session, List types)
- {
- RemoveAllSubscriptions(session);
-
- foreach (EventType e in types)
- {
- AddSubscription(session, e);
- }
- }
-
- ///
- /// Unsubscribe from a given event type.
- ///
- /// Session Id
- /// Event Type to be removed
- public void RemoveSubscription(string session, EventType type)
- {
- if (_subscriptionList.ContainsKey(session))
- {
- List eventTypeList;
- _subscriptionList.TryGetValue(session, out eventTypeList);
- if (eventTypeList != null && eventTypeList.Contains(type))
- {
- eventTypeList.Remove(type);
- //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
- }
- }
- }
-
- public void RemoveSubscriptionList(string session, List types)
- {
- foreach (EventType e in types)
- {
- RemoveSubscription(session, e);
- }
- }
-
- ///
- /// Remove all subscriptons for a given session.
- ///
- /// Session Id
- public void RemoveAllSubscriptions(string session)
- {
- if (_subscriptionList.ContainsKey(session))
- {
- _subscriptionList.Remove(session);
- }
- if (_eventQueues.ContainsKey(session))
- {
- BlockingCollection rem = null;
- _eventQueues.TryRemove(session, out rem);
- }
-
- //base.LogInformation(string.Format("All subscriptions removed for event type {0}", session));
- }
-
- ///
- /// Get a list of accumulated events for a given session. Timeout after 5 seconds if list is empty.
- ///
- ///
- /// List
- public List GetSessionEvents(string session)
- {
- BlockingCollection eList;
- _eventQueues.TryGetValue(session, out eList);
-
- List returnList = new List();
- if (eList == null)
- {
- return returnList;
- }
-
- //Continue to take until the eList is not empty.
- while (true)
- {
- BaseEvent e;
- eList.TryTake(out e);
- if (e != null)
- {
- returnList.Add(e);
- }
- else
- {
- break;
- }
- }
-
- //In the event that eList was empty to begin with, wait for something to be appear or cancel
- if (eList.Count == 0)
- {
- CancellationTokenSource tkSrc = new CancellationTokenSource(5000);
-
- BaseEvent e = null;
- try
- {
- e = eList.Take(tkSrc.Token);
- }
- catch (OperationCanceledException)
- {
- // eat this
- }
- catch (Exception)
- {
- }
-
- if (e != null)
- {
- returnList.Add(e);
- }
- }
-
- return returnList;
- }
-
- ///
- /// Push a new event to the event queue.
- ///
- /// The event to be pushed
- public void PushEvent(BaseEvent newEvent)
- {
- _unprocessedEventsList.Add(newEvent);
- }
-
- #endregion Public Methods
-
- #region Events
- private void UnprocessedEventsList_Changed(object sender, NotifyCollectionChangedEventArgs e)
- {
- switch (e.Action)
- {
- case NotifyCollectionChangedAction.Add:
- {
- foreach (BaseEvent bEvent in e.NewItems)
- {
- ProcessEvent(bEvent);
- }
- break;
- }
- }
- }
-
- #endregion Events
- }
-}
\ No newline at end of file
diff --git a/Aurora/Services/EventManager/EventManager.cs b/Aurora/Services/EventManager/EventManager.cs
new file mode 100644
index 0000000..5207817
--- /dev/null
+++ b/Aurora/Services/EventManager/EventManager.cs
@@ -0,0 +1,208 @@
+using System;
+using System.IO;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+using System.Collections.Specialized;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Google.Protobuf.WellKnownTypes;
+using Google.Protobuf.Reflection;
+using Aurora.Proto.Events;
+using Aurora.Models;
+
+namespace Aurora.Services.EventManager
+{
+ public class EventManager : BaseService
+ {
+ #region Fields
+ private Dictionary> _subscriptionList;
+ private Dictionary> _actionList;
+
+ #endregion Fields
+ public EventManager()
+ {
+ _subscriptionList = new Dictionary>();
+ _actionList = new Dictionary>();
+ }
+
+ #region Private Methods
+
+
+ #endregion Private Methods
+
+ #region Public Methods
+ ///
+ /// Get the list of event type subscriptions for a given session id.
+ ///
+ /// Session Id
+ ///
+ public List GetSubscriptionList(string session)
+ {
+ List eventList = new List();
+ if (_subscriptionList.ContainsKey(session))
+ {
+ _subscriptionList.TryGetValue(session, out eventList);
+ }
+
+ return eventList;
+ }
+
+ ///
+ /// Get the number of event subscriptions for a given session
+ ///
+ /// Session Id
+ ///
+ public int GetSubscriptionCount(string session)
+ {
+ List eventList = new List();
+ if (_subscriptionList.ContainsKey(session))
+ {
+ _subscriptionList.TryGetValue(session, out eventList);
+ }
+
+ return eventList.Count();
+ }
+
+ ///
+ /// Add a new subscription
+ ///
+ ///
+ ///
+ public bool AddSubscription(string session, EventType type)
+ {
+ bool success = false;
+ lock (_subscriptionList)
+ {
+ if (!_subscriptionList.ContainsKey(session))
+ {
+ //Add session to subscription list
+ List eventList = new List();
+ eventList.Add(type);
+ _subscriptionList.Add(session, eventList);
+ success = true;
+ }
+ else
+ {
+ _subscriptionList.TryGetValue(session, out List eventList);
+ if (eventList != null)
+ {
+ eventList.Add(type);
+ success = true;
+ }
+ }
+ }
+
+ return success;
+ }
+
+ ///
+ /// Add a list of subscriptions. This unsubscribes from unused events.
+ ///
+ /// The browser session id.
+ /// The list of event types to subscribe to.
+ public void AddSubscriptionList(string session, List types)
+ {
+ RemoveAllSubscriptions(session);
+
+ foreach (EventType e in types)
+ {
+ AddSubscription(session, e);
+ }
+ }
+
+ ///
+ /// Unsubscribe from a given event type.
+ ///
+ /// Session Id
+ /// Event Type to be removed
+ public void RemoveSubscription(string session, EventType type)
+ {
+ lock (_subscriptionList)
+ {
+ if (_subscriptionList.ContainsKey(session))
+ {
+ List eventTypeList;
+ _subscriptionList.TryGetValue(session, out eventTypeList);
+ if (eventTypeList != null && eventTypeList.Contains(type))
+ {
+ eventTypeList.Remove(type);
+ //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
+ }
+ }
+ }
+ }
+
+ public void RemoveSubscriptionList(string session, List types)
+ {
+ foreach (EventType e in types)
+ {
+ RemoveSubscription(session, e);
+ }
+ }
+
+ ///
+ /// Remove all subscriptons for a given session.
+ ///
+ /// Session Id
+ public void RemoveAllSubscriptions(string session)
+ {
+ if (_subscriptionList.ContainsKey(session))
+ {
+ _subscriptionList.Remove(session);
+ }
+ }
+
+ public void AddEventHandler(Action action, string sessionId)
+ {
+ lock (_actionList)
+ {
+ _actionList.Add(sessionId, action);
+ }
+ }
+
+ public void RemoveEventHandler(string sessionId)
+ {
+ _actionList.Remove(sessionId);
+ }
+
+ public void FireEvent(BaseEvent bEvent)
+ {
+ Dictionary> actionsCopy = new Dictionary>();
+ //Copy actions list
+ lock (_actionList)
+ {
+ foreach (KeyValuePair> pair in _actionList)
+ {
+ actionsCopy.Add(pair.Key, pair.Value);
+ }
+ }
+
+ lock (_subscriptionList)
+ {
+ foreach (KeyValuePair> pair in _subscriptionList)
+ {
+ Console.WriteLine("SERVER - Invoking action for client: " + pair.Key);
+
+ Task.Delay(1000);
+ //If action list contains an action for id, invoke
+ if (actionsCopy.ContainsKey(pair.Key))
+ {
+ actionsCopy.TryGetValue(pair.Key, out Action action);
+
+ ParameterizedThreadStart operation = new ParameterizedThreadStart(obj => action((BaseEvent)obj));
+ Thread executionThread = new Thread(operation, 1024 * 1024);
+
+ executionThread.Start(bEvent);
+ executionThread.Join();
+ }
+ }
+ }
+
+ }
+
+ #endregion Public Methods
+
+ }
+}
\ No newline at end of file
diff --git a/Aurora/Services/ServerService.cs b/Aurora/Services/ServerService.cs
index ba51764..3862600 100644
--- a/Aurora/Services/ServerService.cs
+++ b/Aurora/Services/ServerService.cs
@@ -86,7 +86,6 @@ namespace Aurora.Services
RegisterService(RemotePlaybackService.BindService(_remotePlaybackImpl));
RegisterService(RemoteEventService.BindService(_remoteEventImpl));
}
-
_server.Start();
}
catch (Exception ex)