using System; using System.IO; using System.Collections.Concurrent; using System.Collections.Generic; using System.Collections.ObjectModel; using System.Collections.Specialized; using System.Linq; using System.Threading; using Google.Protobuf.WellKnownTypes; using Google.Protobuf.Reflection; using Aurora.Proto.Events; using Aurora.Models; namespace Aurora.Services { public class EventManager : BaseService { //TODO purge inactive sessions from the list public EventManager() { _eventQueues = new ConcurrentDictionary>(); _subscriptionList = new Dictionary>(); _unprocessedEventsList = new ObservableCollection(); _unprocessedEventsList.CollectionChanged += UnprocessedEventsList_Changed; } #region Fields private ConcurrentDictionary> _eventQueues; private Dictionary> _subscriptionList; private ObservableCollection _unprocessedEventsList; #endregion Fields #region Private Methods /// /// Add event to appropriate events list for subscribed sessions /// /// private void ProcessEvent(BaseEvent unprocessedEvent) { lock (_subscriptionList) { lock (_unprocessedEventsList) { //Duplicate events into client event queues foreach (KeyValuePair> subscription in _subscriptionList) { //Active Events contains the session in question if (_eventQueues.ContainsKey(subscription.Key)) { //Add all of the events to active events that are subscribed if (subscription.Value.Contains(unprocessedEvent.EventType)) { BlockingCollection eventList; _eventQueues.TryGetValue(subscription.Key, out eventList); if (eventList != null) { eventList.Add(unprocessedEvent); } } } } //Remove Event from list _unprocessedEventsList.Remove(unprocessedEvent); } } } #endregion Private Methods #region Public Methods /// /// Get the list of event type subscriptions for a given session id. /// /// Session Id /// public List GetSubscriptionList(string session) { List eventList = new List(); if (_subscriptionList.ContainsKey(session)) { _subscriptionList.TryGetValue(session, out eventList); } return eventList; } /// /// Get the number of event subscriptions for a given session /// /// Session Id /// public int GetSubscriptionCount(string session) { List eventList = new List(); if (_subscriptionList.ContainsKey(session)) { _subscriptionList.TryGetValue(session, out eventList); } return eventList.Count(); } /// /// Add a new subscription /// /// /// public bool AddSubscription(string session, EventType type) { bool success = false; if (!_subscriptionList.ContainsKey(session)) { //Add session to subscription list List eventList = new List(); eventList.Add(type); _subscriptionList.Add(session, eventList); success = true; //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); } else { List eventList; _subscriptionList.TryGetValue(session, out eventList); if (eventList != null) { eventList.Add(type); success = true; //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); } } //Add activeEvents if it doesn't exist if (!_eventQueues.ContainsKey(session)) { //Add session to active events _eventQueues.TryAdd(session, new BlockingCollection()); } return success; } /// /// Add a list of subscriptions. This unsubscribes from unused events. /// /// The browser session id. /// The list of event types to subscribe to. public void AddSubscriptionList(string session, List types) { RemoveAllSubscriptions(session); foreach (EventType e in types) { AddSubscription(session, e); } } /// /// Unsubscribe from a given event type. /// /// Session Id /// Event Type to be removed public void RemoveSubscription(string session, EventType type) { if (_subscriptionList.ContainsKey(session)) { List eventTypeList; _subscriptionList.TryGetValue(session, out eventTypeList); if (eventTypeList != null && eventTypeList.Contains(type)) { eventTypeList.Remove(type); //base.LogInformation(string.Format("Subscription removed for event type {0} subscription on session {1}", type.ToString(), session)); } } } public void RemoveSubscriptionList(string session, List types) { foreach (EventType e in types) { RemoveSubscription(session, e); } } /// /// Remove all subscriptons for a given session. /// /// Session Id public void RemoveAllSubscriptions(string session) { if (_subscriptionList.ContainsKey(session)) { _subscriptionList.Remove(session); } if (_eventQueues.ContainsKey(session)) { BlockingCollection rem = null; _eventQueues.TryRemove(session, out rem); } //base.LogInformation(string.Format("All subscriptions removed for event type {0}", session)); } /// /// Get a list of accumulated events for a given session. Timeout after 5 seconds if list is empty. /// /// /// List public List GetSessionEvents(string session) { BlockingCollection eList; _eventQueues.TryGetValue(session, out eList); List returnList = new List(); if (eList == null) { return returnList; } //Continue to take until the eList is not empty. while (true) { BaseEvent e; eList.TryTake(out e); if (e != null) { returnList.Add(e); } else { break; } } //In the event that eList was empty to begin with, wait for something to be appear or cancel if (eList.Count == 0) { CancellationTokenSource tkSrc = new CancellationTokenSource(5000); BaseEvent e = null; try { e = eList.Take(tkSrc.Token); } catch (OperationCanceledException) { // eat this } catch (Exception) { } if (e != null) { returnList.Add(e); } } return returnList; } /// /// Push a new event to the event queue. /// /// The event to be pushed public void PushEvent(BaseEvent newEvent) { _unprocessedEventsList.Add(newEvent); } #endregion Public Methods #region Events private void UnprocessedEventsList_Changed(object sender, NotifyCollectionChangedEventArgs e) { switch (e.Action) { case NotifyCollectionChangedAction.Add: { foreach (BaseEvent bEvent in e.NewItems) { ProcessEvent(bEvent); } break; } } } #endregion Events } }