This repository has been archived on 2020-12-20. You can view files and clone it, but cannot push or open issues or pull requests.
aurora-sharp-desktop/Aurora/Services/EventManager.cs

294 lines
10 KiB
C#
Raw Normal View History

using System;
using System.IO;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Linq;
using System.Threading;
using Google.Protobuf.WellKnownTypes;
using Google.Protobuf.Reflection;
using Aurora.Proto.Events;
using Aurora.Models;
namespace Aurora.Services
{
public class EventManager : BaseService<EventManager>
{
//TODO purge inactive sessions from the list
public EventManager()
{
_eventQueues = new ConcurrentDictionary<string, BlockingCollection<BaseEvent>>();
_subscriptionList = new Dictionary<string, List<EventType>>();
_unprocessedEventsList = new ObservableCollection<BaseEvent>();
_unprocessedEventsList.CollectionChanged += UnprocessedEventsList_Changed;
}
#region Fields
private ConcurrentDictionary<string, BlockingCollection<BaseEvent>> _eventQueues;
private Dictionary<string, List<EventType>> _subscriptionList;
private ObservableCollection<BaseEvent> _unprocessedEventsList;
#endregion Fields
#region Private Methods
/// <summary>
/// Add event to appropriate events list for subscribed sessions
/// </summary>
/// <param name="unprocessedEvent"></param>
private void ProcessEvent(BaseEvent unprocessedEvent)
{
lock (_subscriptionList)
{
lock (_unprocessedEventsList)
{
//Duplicate events into client event queues
foreach (KeyValuePair<string, List<EventType>> subscription in _subscriptionList)
{
//Active Events contains the session in question
if (_eventQueues.ContainsKey(subscription.Key))
{
//Add all of the events to active events that are subscribed
if (subscription.Value.Contains(unprocessedEvent.EventType))
{
BlockingCollection<BaseEvent> eventList;
_eventQueues.TryGetValue(subscription.Key, out eventList);
if (eventList != null)
{
eventList.Add(unprocessedEvent);
}
}
}
}
//Remove Event from list
_unprocessedEventsList.Remove(unprocessedEvent);
}
}
}
#endregion Private Methods
#region Public Methods
/// <summary>
/// Get the list of event type subscriptions for a given session id.
/// </summary>
/// <param name="session">Session Id</param>
/// <returns></returns>
public List<EventType> GetSubscriptionList(string session)
{
List<EventType> eventList = new List<EventType>();
if (_subscriptionList.ContainsKey(session))
{
_subscriptionList.TryGetValue(session, out eventList);
}
return eventList;
}
/// <summary>
/// Get the number of event subscriptions for a given session
/// </summary>
/// <param name="session">Session Id</param>
/// <returns></returns>
public int GetSubscriptionCount(string session)
{
List<EventType> eventList = new List<EventType>();
if (_subscriptionList.ContainsKey(session))
{
_subscriptionList.TryGetValue(session, out eventList);
}
return eventList.Count();
}
/// <summary>
/// Add a new subscription
/// </summary>
/// <param name="session"></param>
/// <param name="type"></param>
public bool AddSubscription(string session, EventType type)
{
bool success = false;
if (!_subscriptionList.ContainsKey(session))
{
//Add session to subscription list
List<EventType> eventList = new List<EventType>();
eventList.Add(type);
_subscriptionList.Add(session, eventList);
success = true;
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
}
else
{
List<EventType> eventList;
_subscriptionList.TryGetValue(session, out eventList);
if (eventList != null)
{
eventList.Add(type);
success = true;
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
}
}
//Add activeEvents if it doesn't exist
if (!_eventQueues.ContainsKey(session))
{
//Add session to active events
_eventQueues.TryAdd(session, new BlockingCollection<BaseEvent>());
}
return success;
}
/// <summary>
/// Add a list of subscriptions. This unsubscribes from unused events.
/// </summary>
/// <param name="session">The browser session id.</param>
/// <param name="types">The list of event types to subscribe to.</param>
public void AddSubscriptionList(string session, List<EventType> types)
{
RemoveAllSubscriptions(session);
foreach (EventType e in types)
{
AddSubscription(session, e);
}
}
/// <summary>
/// Unsubscribe from a given event type.
/// </summary>
/// <param name="session">Session Id</param>
/// <param name="type">Event Type to be removed</param>
public void RemoveSubscription(string session, EventType type)
{
if (_subscriptionList.ContainsKey(session))
{
List<EventType> eventTypeList;
_subscriptionList.TryGetValue(session, out eventTypeList);
if (eventTypeList != null && eventTypeList.Contains(type))
{
eventTypeList.Remove(type);
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
}
}
}
public void RemoveSubscriptionList(string session, List<EventType> types)
{
foreach (EventType e in types)
{
RemoveSubscription(session, e);
}
}
/// <summary>
/// Remove all subscriptons for a given session.
/// </summary>
/// <param name="session">Session Id</param>
public void RemoveAllSubscriptions(string session)
{
if (_subscriptionList.ContainsKey(session))
{
_subscriptionList.Remove(session);
}
if (_eventQueues.ContainsKey(session))
{
BlockingCollection<BaseEvent> rem = null;
_eventQueues.TryRemove(session, out rem);
}
//base.LogInformation(string.Format("All subscriptions removed for event type {0}", session));
}
/// <summary>
/// Get a list of accumulated events for a given session. Timeout after 5 seconds if list is empty.
/// </summary>
/// <param name="session"></param>
/// <returns>List<BaseEvent></returns>
public List<BaseEvent> GetSessionEvents(string session)
{
BlockingCollection<BaseEvent> eList;
_eventQueues.TryGetValue(session, out eList);
List<BaseEvent> returnList = new List<BaseEvent>();
if (eList == null)
{
return returnList;
}
//Continue to take until the eList is not empty.
while (true)
{
BaseEvent e;
eList.TryTake(out e);
if (e != null)
{
returnList.Add(e);
}
else
{
break;
}
}
//In the event that eList was empty to begin with, wait for something to be appear or cancel
if (eList.Count == 0)
{
CancellationTokenSource tkSrc = new CancellationTokenSource(5000);
BaseEvent e = null;
try
{
e = eList.Take(tkSrc.Token);
}
catch (OperationCanceledException)
{
// eat this
}
catch (Exception)
{
}
if (e != null)
{
returnList.Add(e);
}
}
return returnList;
}
/// <summary>
/// Push a new event to the event queue.
/// </summary>
/// <param name="newEvent">The event to be pushed</param>
public void PushEvent(BaseEvent newEvent)
{
_unprocessedEventsList.Add(newEvent);
}
#endregion Public Methods
#region Events
private void UnprocessedEventsList_Changed(object sender, NotifyCollectionChangedEventArgs e)
{
switch (e.Action)
{
case NotifyCollectionChangedAction.Add:
{
foreach (BaseEvent bEvent in e.NewItems)
{
ProcessEvent(bEvent);
}
break;
}
}
}
#endregion Events
}
}