This repository has been archived on 2020-12-20. You can view files and clone it, but cannot push or open issues or pull requests.
watsonb8 823e1341ca First pass at events almost buttoned up.
The goal is to get the members list to update when new users enter and leave the party.
2019-07-07 17:12:13 -04:00

294 lines
10 KiB
C#
Executable File

using System;
using System.IO;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using System.Linq;
using System.Threading;
using Google.Protobuf.WellKnownTypes;
using Google.Protobuf.Reflection;
using Aurora.Proto.Events;
using Aurora.Models;
namespace Aurora.Services
{
public class EventManager : BaseService<EventManager>
{
//TODO purge inactive sessions from the list
public EventManager()
{
_eventQueues = new ConcurrentDictionary<string, BlockingCollection<BaseEvent>>();
_subscriptionList = new Dictionary<string, List<EventType>>();
_unprocessedEventsList = new ObservableCollection<BaseEvent>();
_unprocessedEventsList.CollectionChanged += UnprocessedEventsList_Changed;
}
#region Fields
private ConcurrentDictionary<string, BlockingCollection<BaseEvent>> _eventQueues;
private Dictionary<string, List<EventType>> _subscriptionList;
private ObservableCollection<BaseEvent> _unprocessedEventsList;
#endregion Fields
#region Private Methods
/// <summary>
/// Add event to appropriate events list for subscribed sessions
/// </summary>
/// <param name="unprocessedEvent"></param>
private void ProcessEvent(BaseEvent unprocessedEvent)
{
lock (_subscriptionList)
{
lock (_unprocessedEventsList)
{
//Duplicate events into client event queues
foreach (KeyValuePair<string, List<EventType>> subscription in _subscriptionList)
{
//Active Events contains the session in question
if (_eventQueues.ContainsKey(subscription.Key))
{
//Add all of the events to active events that are subscribed
if (subscription.Value.Contains(unprocessedEvent.EventType))
{
BlockingCollection<BaseEvent> eventList;
_eventQueues.TryGetValue(subscription.Key, out eventList);
if (eventList != null)
{
eventList.Add(unprocessedEvent);
}
}
}
}
//Remove Event from list
_unprocessedEventsList.Remove(unprocessedEvent);
}
}
}
#endregion Private Methods
#region Public Methods
/// <summary>
/// Get the list of event type subscriptions for a given session id.
/// </summary>
/// <param name="session">Session Id</param>
/// <returns></returns>
public List<EventType> GetSubscriptionList(string session)
{
List<EventType> eventList = new List<EventType>();
if (_subscriptionList.ContainsKey(session))
{
_subscriptionList.TryGetValue(session, out eventList);
}
return eventList;
}
/// <summary>
/// Get the number of event subscriptions for a given session
/// </summary>
/// <param name="session">Session Id</param>
/// <returns></returns>
public int GetSubscriptionCount(string session)
{
List<EventType> eventList = new List<EventType>();
if (_subscriptionList.ContainsKey(session))
{
_subscriptionList.TryGetValue(session, out eventList);
}
return eventList.Count();
}
/// <summary>
/// Add a new subscription
/// </summary>
/// <param name="session"></param>
/// <param name="type"></param>
public bool AddSubscription(string session, EventType type)
{
bool success = false;
if (!_subscriptionList.ContainsKey(session))
{
//Add session to subscription list
List<EventType> eventList = new List<EventType>();
eventList.Add(type);
_subscriptionList.Add(session, eventList);
success = true;
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
}
else
{
List<EventType> eventList;
_subscriptionList.TryGetValue(session, out eventList);
if (eventList != null)
{
eventList.Add(type);
success = true;
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
}
}
//Add activeEvents if it doesn't exist
if (!_eventQueues.ContainsKey(session))
{
//Add session to active events
_eventQueues.TryAdd(session, new BlockingCollection<BaseEvent>());
}
return success;
}
/// <summary>
/// Add a list of subscriptions. This unsubscribes from unused events.
/// </summary>
/// <param name="session">The browser session id.</param>
/// <param name="types">The list of event types to subscribe to.</param>
public void AddSubscriptionList(string session, List<EventType> types)
{
RemoveAllSubscriptions(session);
foreach (EventType e in types)
{
AddSubscription(session, e);
}
}
/// <summary>
/// Unsubscribe from a given event type.
/// </summary>
/// <param name="session">Session Id</param>
/// <param name="type">Event Type to be removed</param>
public void RemoveSubscription(string session, EventType type)
{
if (_subscriptionList.ContainsKey(session))
{
List<EventType> eventTypeList;
_subscriptionList.TryGetValue(session, out eventTypeList);
if (eventTypeList != null && eventTypeList.Contains(type))
{
eventTypeList.Remove(type);
//base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session));
}
}
}
public void RemoveSubscriptionList(string session, List<EventType> types)
{
foreach (EventType e in types)
{
RemoveSubscription(session, e);
}
}
/// <summary>
/// Remove all subscriptons for a given session.
/// </summary>
/// <param name="session">Session Id</param>
public void RemoveAllSubscriptions(string session)
{
if (_subscriptionList.ContainsKey(session))
{
_subscriptionList.Remove(session);
}
if (_eventQueues.ContainsKey(session))
{
BlockingCollection<BaseEvent> rem = null;
_eventQueues.TryRemove(session, out rem);
}
//base.LogInformation(string.Format("All subscriptions removed for event type {0}", session));
}
/// <summary>
/// Get a list of accumulated events for a given session. Timeout after 5 seconds if list is empty.
/// </summary>
/// <param name="session"></param>
/// <returns>List<BaseEvent></returns>
public List<BaseEvent> GetSessionEvents(string session)
{
BlockingCollection<BaseEvent> eList;
_eventQueues.TryGetValue(session, out eList);
List<BaseEvent> returnList = new List<BaseEvent>();
if (eList == null)
{
return returnList;
}
//Continue to take until the eList is not empty.
while (true)
{
BaseEvent e;
eList.TryTake(out e);
if (e != null)
{
returnList.Add(e);
}
else
{
break;
}
}
//In the event that eList was empty to begin with, wait for something to be appear or cancel
if (eList.Count == 0)
{
CancellationTokenSource tkSrc = new CancellationTokenSource(5000);
BaseEvent e = null;
try
{
e = eList.Take(tkSrc.Token);
}
catch (OperationCanceledException)
{
// eat this
}
catch (Exception)
{
}
if (e != null)
{
returnList.Add(e);
}
}
return returnList;
}
/// <summary>
/// Push a new event to the event queue.
/// </summary>
/// <param name="newEvent">The event to be pushed</param>
public void PushEvent(BaseEvent newEvent)
{
_unprocessedEventsList.Add(newEvent);
}
#endregion Public Methods
#region Events
private void UnprocessedEventsList_Changed(object sender, NotifyCollectionChangedEventArgs e)
{
switch (e.Action)
{
case NotifyCollectionChangedAction.Add:
{
foreach (BaseEvent bEvent in e.NewItems)
{
ProcessEvent(bEvent);
}
break;
}
}
}
#endregion Events
}
}