EventManager rewritten to support a push arch instead of an internal poll. Crash finally fixed!!! The dangers of threading.
This commit is contained in:
@ -1,294 +0,0 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Collections.Specialized;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Google.Protobuf.Reflection;
|
||||
using Aurora.Proto.Events;
|
||||
using Aurora.Models;
|
||||
|
||||
namespace Aurora.Services
|
||||
{
|
||||
public class EventManager : BaseService<EventManager>
|
||||
{
|
||||
//TODO purge inactive sessions from the list
|
||||
public EventManager()
|
||||
{
|
||||
_eventQueues = new ConcurrentDictionary<string, BlockingCollection<BaseEvent>>();
|
||||
_subscriptionList = new Dictionary<string, List<EventType>>();
|
||||
_unprocessedEventsList = new ObservableCollection<BaseEvent>();
|
||||
_unprocessedEventsList.CollectionChanged += UnprocessedEventsList_Changed;
|
||||
}
|
||||
|
||||
#region Fields
|
||||
private ConcurrentDictionary<string, BlockingCollection<BaseEvent>> _eventQueues;
|
||||
private Dictionary<string, List<EventType>> _subscriptionList;
|
||||
private ObservableCollection<BaseEvent> _unprocessedEventsList;
|
||||
|
||||
|
||||
#endregion Fields
|
||||
|
||||
#region Private Methods
|
||||
/// <summary>
|
||||
/// Add event to appropriate events list for subscribed sessions
|
||||
/// </summary>
|
||||
/// <param name="unprocessedEvent"></param>
|
||||
private void ProcessEvent(BaseEvent unprocessedEvent)
|
||||
{
|
||||
lock (_subscriptionList)
|
||||
{
|
||||
lock (_unprocessedEventsList)
|
||||
{
|
||||
//Duplicate events into client event queues
|
||||
foreach (KeyValuePair<string, List<EventType>> subscription in _subscriptionList)
|
||||
{
|
||||
//Active Events contains the session in question
|
||||
if (_eventQueues.ContainsKey(subscription.Key))
|
||||
{
|
||||
//Add all of the events to active events that are subscribed
|
||||
|
||||
if (subscription.Value.Contains(unprocessedEvent.EventType))
|
||||
{
|
||||
BlockingCollection<BaseEvent> eventList;
|
||||
_eventQueues.TryGetValue(subscription.Key, out eventList);
|
||||
if (eventList != null)
|
||||
{
|
||||
eventList.Add(unprocessedEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//Remove Event from list
|
||||
_unprocessedEventsList.Remove(unprocessedEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endregion Private Methods
|
||||
|
||||
#region Public Methods
|
||||
/// <summary>
|
||||
/// Get the list of event type subscriptions for a given session id.
|
||||
/// </summary>
|
||||
/// <param name="session">Session Id</param>
|
||||
/// <returns></returns>
|
||||
public List<EventType> GetSubscriptionList(string session)
|
||||
{
|
||||
List<EventType> eventList = new List<EventType>();
|
||||
if (_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
_subscriptionList.TryGetValue(session, out eventList);
|
||||
}
|
||||
|
||||
return eventList;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the number of event subscriptions for a given session
|
||||
/// </summary>
|
||||
/// <param name="session">Session Id</param>
|
||||
/// <returns></returns>
|
||||
public int GetSubscriptionCount(string session)
|
||||
{
|
||||
List<EventType> eventList = new List<EventType>();
|
||||
if (_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
_subscriptionList.TryGetValue(session, out eventList);
|
||||
}
|
||||
|
||||
return eventList.Count();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a new subscription
|
||||
/// </summary>
|
||||
/// <param name="session"></param>
|
||||
/// <param name="type"></param>
|
||||
public bool AddSubscription(string session, EventType type)
|
||||
{
|
||||
bool success = false;
|
||||
if (!_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
//Add session to subscription list
|
||||
List<EventType> eventList = new List<EventType>();
|
||||
eventList.Add(type);
|
||||
_subscriptionList.Add(session, eventList);
|
||||
success = true;
|
||||
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
|
||||
}
|
||||
else
|
||||
{
|
||||
List<EventType> eventList;
|
||||
_subscriptionList.TryGetValue(session, out eventList);
|
||||
if (eventList != null)
|
||||
{
|
||||
eventList.Add(type);
|
||||
success = true;
|
||||
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
|
||||
}
|
||||
}
|
||||
|
||||
//Add activeEvents if it doesn't exist
|
||||
if (!_eventQueues.ContainsKey(session))
|
||||
{
|
||||
//Add session to active events
|
||||
_eventQueues.TryAdd(session, new BlockingCollection<BaseEvent>());
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a list of subscriptions. This unsubscribes from unused events.
|
||||
/// </summary>
|
||||
/// <param name="session">The browser session id.</param>
|
||||
/// <param name="types">The list of event types to subscribe to.</param>
|
||||
public void AddSubscriptionList(string session, List<EventType> types)
|
||||
{
|
||||
RemoveAllSubscriptions(session);
|
||||
|
||||
foreach (EventType e in types)
|
||||
{
|
||||
AddSubscription(session, e);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribe from a given event type.
|
||||
/// </summary>
|
||||
/// <param name="session">Session Id</param>
|
||||
/// <param name="type">Event Type to be removed</param>
|
||||
public void RemoveSubscription(string session, EventType type)
|
||||
{
|
||||
if (_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
List<EventType> eventTypeList;
|
||||
_subscriptionList.TryGetValue(session, out eventTypeList);
|
||||
if (eventTypeList != null && eventTypeList.Contains(type))
|
||||
{
|
||||
eventTypeList.Remove(type);
|
||||
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void RemoveSubscriptionList(string session, List<EventType> types)
|
||||
{
|
||||
foreach (EventType e in types)
|
||||
{
|
||||
RemoveSubscription(session, e);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Remove all subscriptons for a given session.
|
||||
/// </summary>
|
||||
/// <param name="session">Session Id</param>
|
||||
public void RemoveAllSubscriptions(string session)
|
||||
{
|
||||
if (_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
_subscriptionList.Remove(session);
|
||||
}
|
||||
if (_eventQueues.ContainsKey(session))
|
||||
{
|
||||
BlockingCollection<BaseEvent> rem = null;
|
||||
_eventQueues.TryRemove(session, out rem);
|
||||
}
|
||||
|
||||
//base.LogInformation(string.Format("All subscriptions removed for event type {0}", session));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get a list of accumulated events for a given session. Timeout after 5 seconds if list is empty.
|
||||
/// </summary>
|
||||
/// <param name="session"></param>
|
||||
/// <returns>List<BaseEvent></returns>
|
||||
public List<BaseEvent> GetSessionEvents(string session)
|
||||
{
|
||||
BlockingCollection<BaseEvent> eList;
|
||||
_eventQueues.TryGetValue(session, out eList);
|
||||
|
||||
List<BaseEvent> returnList = new List<BaseEvent>();
|
||||
if (eList == null)
|
||||
{
|
||||
return returnList;
|
||||
}
|
||||
|
||||
//Continue to take until the eList is not empty.
|
||||
while (true)
|
||||
{
|
||||
BaseEvent e;
|
||||
eList.TryTake(out e);
|
||||
if (e != null)
|
||||
{
|
||||
returnList.Add(e);
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//In the event that eList was empty to begin with, wait for something to be appear or cancel
|
||||
if (eList.Count == 0)
|
||||
{
|
||||
CancellationTokenSource tkSrc = new CancellationTokenSource(5000);
|
||||
|
||||
BaseEvent e = null;
|
||||
try
|
||||
{
|
||||
e = eList.Take(tkSrc.Token);
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// eat this
|
||||
}
|
||||
catch (Exception)
|
||||
{
|
||||
}
|
||||
|
||||
if (e != null)
|
||||
{
|
||||
returnList.Add(e);
|
||||
}
|
||||
}
|
||||
|
||||
return returnList;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Push a new event to the event queue.
|
||||
/// </summary>
|
||||
/// <param name="newEvent">The event to be pushed</param>
|
||||
public void PushEvent(BaseEvent newEvent)
|
||||
{
|
||||
_unprocessedEventsList.Add(newEvent);
|
||||
}
|
||||
|
||||
#endregion Public Methods
|
||||
|
||||
#region Events
|
||||
private void UnprocessedEventsList_Changed(object sender, NotifyCollectionChangedEventArgs e)
|
||||
{
|
||||
switch (e.Action)
|
||||
{
|
||||
case NotifyCollectionChangedAction.Add:
|
||||
{
|
||||
foreach (BaseEvent bEvent in e.NewItems)
|
||||
{
|
||||
ProcessEvent(bEvent);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endregion Events
|
||||
}
|
||||
}
|
208
Aurora/Services/EventManager/EventManager.cs
Normal file
208
Aurora/Services/EventManager/EventManager.cs
Normal file
@ -0,0 +1,208 @@
|
||||
using System;
|
||||
using System.IO;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Collections.ObjectModel;
|
||||
using System.Collections.Specialized;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using Google.Protobuf.Reflection;
|
||||
using Aurora.Proto.Events;
|
||||
using Aurora.Models;
|
||||
|
||||
namespace Aurora.Services.EventManager
|
||||
{
|
||||
public class EventManager : BaseService<EventManager>
|
||||
{
|
||||
#region Fields
|
||||
private Dictionary<string, List<EventType>> _subscriptionList;
|
||||
private Dictionary<string, Action<BaseEvent>> _actionList;
|
||||
|
||||
#endregion Fields
|
||||
public EventManager()
|
||||
{
|
||||
_subscriptionList = new Dictionary<string, List<EventType>>();
|
||||
_actionList = new Dictionary<string, Action<BaseEvent>>();
|
||||
}
|
||||
|
||||
#region Private Methods
|
||||
|
||||
|
||||
#endregion Private Methods
|
||||
|
||||
#region Public Methods
|
||||
/// <summary>
|
||||
/// Get the list of event type subscriptions for a given session id.
|
||||
/// </summary>
|
||||
/// <param name="session">Session Id</param>
|
||||
/// <returns></returns>
|
||||
public List<EventType> GetSubscriptionList(string session)
|
||||
{
|
||||
List<EventType> eventList = new List<EventType>();
|
||||
if (_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
_subscriptionList.TryGetValue(session, out eventList);
|
||||
}
|
||||
|
||||
return eventList;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Get the number of event subscriptions for a given session
|
||||
/// </summary>
|
||||
/// <param name="session">Session Id</param>
|
||||
/// <returns></returns>
|
||||
public int GetSubscriptionCount(string session)
|
||||
{
|
||||
List<EventType> eventList = new List<EventType>();
|
||||
if (_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
_subscriptionList.TryGetValue(session, out eventList);
|
||||
}
|
||||
|
||||
return eventList.Count();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a new subscription
|
||||
/// </summary>
|
||||
/// <param name="session"></param>
|
||||
/// <param name="type"></param>
|
||||
public bool AddSubscription(string session, EventType type)
|
||||
{
|
||||
bool success = false;
|
||||
lock (_subscriptionList)
|
||||
{
|
||||
if (!_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
//Add session to subscription list
|
||||
List<EventType> eventList = new List<EventType>();
|
||||
eventList.Add(type);
|
||||
_subscriptionList.Add(session, eventList);
|
||||
success = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
_subscriptionList.TryGetValue(session, out List<EventType> eventList);
|
||||
if (eventList != null)
|
||||
{
|
||||
eventList.Add(type);
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return success;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Add a list of subscriptions. This unsubscribes from unused events.
|
||||
/// </summary>
|
||||
/// <param name="session">The browser session id.</param>
|
||||
/// <param name="types">The list of event types to subscribe to.</param>
|
||||
public void AddSubscriptionList(string session, List<EventType> types)
|
||||
{
|
||||
RemoveAllSubscriptions(session);
|
||||
|
||||
foreach (EventType e in types)
|
||||
{
|
||||
AddSubscription(session, e);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Unsubscribe from a given event type.
|
||||
/// </summary>
|
||||
/// <param name="session">Session Id</param>
|
||||
/// <param name="type">Event Type to be removed</param>
|
||||
public void RemoveSubscription(string session, EventType type)
|
||||
{
|
||||
lock (_subscriptionList)
|
||||
{
|
||||
if (_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
List<EventType> eventTypeList;
|
||||
_subscriptionList.TryGetValue(session, out eventTypeList);
|
||||
if (eventTypeList != null && eventTypeList.Contains(type))
|
||||
{
|
||||
eventTypeList.Remove(type);
|
||||
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void RemoveSubscriptionList(string session, List<EventType> types)
|
||||
{
|
||||
foreach (EventType e in types)
|
||||
{
|
||||
RemoveSubscription(session, e);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Remove all subscriptons for a given session.
|
||||
/// </summary>
|
||||
/// <param name="session">Session Id</param>
|
||||
public void RemoveAllSubscriptions(string session)
|
||||
{
|
||||
if (_subscriptionList.ContainsKey(session))
|
||||
{
|
||||
_subscriptionList.Remove(session);
|
||||
}
|
||||
}
|
||||
|
||||
public void AddEventHandler(Action<BaseEvent> action, string sessionId)
|
||||
{
|
||||
lock (_actionList)
|
||||
{
|
||||
_actionList.Add(sessionId, action);
|
||||
}
|
||||
}
|
||||
|
||||
public void RemoveEventHandler(string sessionId)
|
||||
{
|
||||
_actionList.Remove(sessionId);
|
||||
}
|
||||
|
||||
public void FireEvent(BaseEvent bEvent)
|
||||
{
|
||||
Dictionary<string, Action<BaseEvent>> actionsCopy = new Dictionary<string, Action<BaseEvent>>();
|
||||
//Copy actions list
|
||||
lock (_actionList)
|
||||
{
|
||||
foreach (KeyValuePair<string, Action<BaseEvent>> pair in _actionList)
|
||||
{
|
||||
actionsCopy.Add(pair.Key, pair.Value);
|
||||
}
|
||||
}
|
||||
|
||||
lock (_subscriptionList)
|
||||
{
|
||||
foreach (KeyValuePair<string, List<EventType>> pair in _subscriptionList)
|
||||
{
|
||||
Console.WriteLine("SERVER - Invoking action for client: " + pair.Key);
|
||||
|
||||
Task.Delay(1000);
|
||||
//If action list contains an action for id, invoke
|
||||
if (actionsCopy.ContainsKey(pair.Key))
|
||||
{
|
||||
actionsCopy.TryGetValue(pair.Key, out Action<BaseEvent> action);
|
||||
|
||||
ParameterizedThreadStart operation = new ParameterizedThreadStart(obj => action((BaseEvent)obj));
|
||||
Thread executionThread = new Thread(operation, 1024 * 1024);
|
||||
|
||||
executionThread.Start(bEvent);
|
||||
executionThread.Join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endregion Public Methods
|
||||
|
||||
}
|
||||
}
|
@ -86,7 +86,6 @@ namespace Aurora.Services
|
||||
RegisterService(RemotePlaybackService.BindService(_remotePlaybackImpl));
|
||||
RegisterService(RemoteEventService.BindService(_remoteEventImpl));
|
||||
}
|
||||
|
||||
_server.Start();
|
||||
}
|
||||
catch (Exception ex)
|
||||
|
Reference in New Issue
Block a user