Merge branch 'feature/single_executor'

This commit is contained in:
watsonb8 2019-07-10 18:23:40 -04:00
commit 11a585ecc0
5 changed files with 199 additions and 382 deletions

View File

@ -1,8 +1,15 @@
using System; using System;
using System.Collections.ObjectModel; using System.Collections.ObjectModel;
using Aurora.Executors; using System.Threading.Tasks;
using Aurora.Proto.Party; using System.Threading;
using System.Linq;
using Xamarin.Forms; using Xamarin.Forms;
using Grpc.Core;
using Aurora.Services;
using Aurora.Proto.General;
using Aurora.Proto.Party;
using Aurora.Proto.Playback;
using Aurora.Proto.Events;
namespace Aurora.Design.Views.Party namespace Aurora.Design.Views.Party
{ {
@ -16,13 +23,17 @@ namespace Aurora.Design.Views.Party
public class PartyViewModel : BaseViewModel public class PartyViewModel : BaseViewModel
{ {
private PartyState _state; private PartyState _state;
private BaseExecutor _executor;
private string _hostname; private string _hostname;
private ObservableCollection<PartyMember> _members; private ObservableCollection<PartyMember> _members;
//Client fields
private Channel _channel;
private RemotePartyService.RemotePartyServiceClient _remotePartyClient;
private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient;
private RemoteEventService.RemoteEventServiceClient _remoteEventsClient;
private Task _eventsTask;
CancellationTokenSource _eventCancellationTokenSource;
public PartyViewModel() public PartyViewModel()
{ {
@ -34,15 +45,12 @@ namespace Aurora.Design.Views.Party
}; };
State(PartyState.SelectingHost); SetState(PartyState.SelectingHost);
} }
~PartyViewModel() ~PartyViewModel()
{ {
if (_executor != null) Task.Run(ServerService.Instance.Stop);
{
_executor.Close();
}
} }
#region Properties #region Properties
@ -80,24 +88,13 @@ namespace Aurora.Design.Views.Party
#endregion Properties #endregion Properties
private void State(PartyState state)
{
_state = state;
OnPropertyChanged("IsSelectingHost");
OnPropertyChanged("IsNotSelectingHost");
}
#region Commands #region Commands
private void OnJoinExecute() private void OnJoinExecute()
{ {
_executor = BaseExecutor.CreateExecutor<ClientExecutor>(); InitializeClients(Hostname, SettingsService.Instance.DefaultPort.ToString());
_executor.Connect(this.Hostname); JoinParty();
SetupMembersCollection(); SetState(PartyState.Connecting);
OnPropertyChanged("Members");
State(PartyState.Connecting);
} }
private bool CanJoinExecute() private bool CanJoinExecute()
@ -107,16 +104,13 @@ namespace Aurora.Design.Views.Party
private void OnHostExecute() private void OnHostExecute()
{ {
ServerService.Instance.Start();
//Instantiate and initialize all executors string localHost = ServerService.GetLocalIPAddress();
_executor = BaseExecutor.CreateExecutor<HostExecutor>(); InitializeClients(localHost, SettingsService.Instance.DefaultPort.ToString());
_executor.Connect(this.Hostname); JoinParty();
SetupMembersCollection();
OnPropertyChanged("Members");
//Change state //Change state
State(PartyState.Connecting); SetState(PartyState.Connecting);
} }
private bool CanHostExecute() private bool CanHostExecute()
@ -127,41 +121,123 @@ namespace Aurora.Design.Views.Party
#endregion Commands #endregion Commands
private void SetupMembersCollection() #region Private Methods
private void InitializeClients(string hostname, string port)
{ {
if (_executor != null) _channel = new Channel(string.Format("{0}:{1}", hostname, port), ChannelCredentials.Insecure);
{
foreach (PartyMember member in _executor.PartyMembers) _remotePartyClient = new RemotePartyService.RemotePartyServiceClient(_channel);
{ _remotePlaybackClient = new RemotePlaybackService.RemotePlaybackServiceClient(_channel);
_members.Add(member); _remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel);
//Assign but don't start task
_eventsTask = new Task(GetEvents);
_eventCancellationTokenSource = new CancellationTokenSource();
} }
//Setup events /// <summary>
_executor.PartyMembers.CollectionChanged += (sender, e) => /// Join the remote party.
/// </summary>
/// <returns></returns>
private async void JoinParty()
{ {
switch (e.Action) await _remotePartyClient.JoinPartyAsync(new JoinPartyRequest
{ {
case System.Collections.Specialized.NotifyCollectionChangedAction.Add: UserName = SettingsService.Instance.Username,
});
RefreshMembers();
//Subscribe to events
SubscribeRequest req = new SubscribeRequest();
req.EventTypes.Add(EventType.PartyMemberJoined);
req.EventTypes.Add(EventType.PartyMemberLeft);
try
{ {
foreach (PartyMember member in e.NewItems) _remoteEventsClient.SubscribeToEvents(req);
}
catch (Exception ex)
{
Console.WriteLine("Error subscribing to events: " + ex.Message);
}
_eventsTask.Start();
}
/// <summary>
/// Refresh members list.
/// </summary>
private void RefreshMembers()
{
MembersResponse response = _remotePartyClient.GetPartyMembers(new Empty());
//Add members
foreach (PartyMember member in response.Members)
{ {
Members.Add(member); Members.Add(member);
OnPropertyChanged("Members");
} }
//Remove out of date members
// foreach (PartyMember member in Members)
// {
// if (!response.Members.Contains(member))
// {
// Members.Remove(member);
// }
// }
}
private void SetState(PartyState state)
{
_state = state;
OnPropertyChanged("IsSelectingHost");
OnPropertyChanged("IsNotSelectingHost");
}
/// <summary>
/// Asynchronous function for processing events off of the event stream.
/// </summary>
/// <returns></returns>
private async void GetEvents()
{
using (AsyncServerStreamingCall<BaseEvent> eventStream = _remoteEventsClient.GetEvents(new Empty()))
{
while (!_eventCancellationTokenSource.Token.IsCancellationRequested)
{
if (await eventStream.ResponseStream.MoveNext(_eventCancellationTokenSource.Token))
{
//Convert derived event type
BaseEvent e = eventStream.ResponseStream.Current;
switch (e.DerivedEventCase)
{
case BaseEvent.DerivedEventOneofCase.None:
{
throw new InvalidOperationException();
}
case BaseEvent.DerivedEventOneofCase.PartyMemberJoinedEvent:
{
PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent;
Members.Add(derivedEvent.Member);
break; break;
} }
case System.Collections.Specialized.NotifyCollectionChangedAction.Remove: case BaseEvent.DerivedEventOneofCase.PartyMemberLeftEvent:
{ {
foreach (PartyMember member in e.OldItems) PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent;
var found = Members.Where(x => x.Id == derivedEvent.Member.Id);
foreach (PartyMember member in found)
{ {
Members.Remove(member); Members.Remove(member);
OnPropertyChanged("Members");
} }
break; break;
} }
} }
};
} }
} }
} }
} }
#endregion Private Methods
}
}

View File

@ -1,58 +0,0 @@
using System;
using System.Reflection;
using System.Linq;
using System.Collections.ObjectModel;
using Aurora.Proto.Party;
namespace Aurora.Executors
{
public abstract class BaseExecutor
{
protected BaseExecutor()
{
}
public Type ExecutorType { get; protected set; }
public abstract ObservableCollection<PartyMember> PartyMembers { get; }
public static BaseExecutor CreateExecutor<T>()
{
BaseExecutor executor = null;
if (typeof(T) == typeof(HostExecutor))
{
executor = new HostExecutor();
executor.ExecutorType = typeof(HostExecutor);
}
else if (typeof(T) == typeof(ClientExecutor))
{
executor = new ClientExecutor();
executor.ExecutorType = typeof(ClientExecutor);
}
else
{
throw new InvalidOperationException("Cannot create an executor of type: " + nameof(T));
}
return executor;
}
public abstract void Connect(string hostname);
public abstract void Close();
public abstract ObservableCollection<PartyMember> GetMembers();
public abstract void GetQueue();
public abstract void AddToQueue();
public abstract void RemoveFromQueue();
}
public enum ExecutorType
{
Server,
Client
}
}

View File

@ -1,184 +0,0 @@
using System;
using System.Collections.ObjectModel;
using System.Threading.Tasks;
using System.Threading;
using System.Linq.Expressions;
using System.Linq;
using Grpc.Core;
using wellKnown = Google.Protobuf.WellKnownTypes;
using Aurora.Proto.General;
using Aurora.Proto.Party;
using Aurora.Proto.Playback;
using Aurora.Proto.Events;
using Aurora.Services;
namespace Aurora.Executors
{
public class ClientExecutor : BaseExecutor
{
private Channel _channel;
private RemotePartyService.RemotePartyServiceClient _remotePartyClient;
private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient;
private RemoteEventService.RemoteEventServiceClient _remoteEventsClient;
private Task _eventsTask;
CancellationTokenSource _eventCancellationTokenSource;
private ObservableCollection<PartyMember> _partyMembers;
public ClientExecutor()
{
_partyMembers = new ObservableCollection<PartyMember>();
}
#region Properties
public override ObservableCollection<PartyMember> PartyMembers
{
get { return _partyMembers; }
}
#endregion Properties
/// <summary>
/// Initiates the connection to a party.
/// </summary>
/// <param name="hostname">The hostname of the gRPC server</param>
public override void Connect(string hostname)
{
_channel = new Channel(string.Format("{0}:{1}", hostname, SettingsService.Instance.DefaultPort), ChannelCredentials.Insecure);
_remotePartyClient = new RemotePartyService.RemotePartyServiceClient(_channel);
_remotePlaybackClient = new RemotePlaybackService.RemotePlaybackServiceClient(_channel);
_remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel);
//Assign but don't start task
_eventsTask = new Task(GetEvents);
_eventCancellationTokenSource = new CancellationTokenSource();
JoinParty();
}
/// <summary>
/// Shutdown Connections
/// </summary>
/// <returns></returns>
public override async void Close()
{
_eventCancellationTokenSource.Cancel();
await _channel.ShutdownAsync();
}
public override void AddToQueue()
{
throw new NotImplementedException();
}
public override ObservableCollection<PartyMember> GetMembers()
{
throw new NotImplementedException();
}
public override void GetQueue()
{
throw new NotImplementedException();
}
public override void RemoveFromQueue()
{
throw new NotImplementedException();
}
/// <summary>
/// Join the remote party.
/// </summary>
/// <returns></returns>
private async void JoinParty()
{
await _remotePartyClient.JoinPartyAsync(new JoinPartyRequest
{
UserName = SettingsService.Instance.Username,
});
RefreshMembers();
//Subscribe to events
SubscribeRequest req = new SubscribeRequest();
req.EventTypes.Add(EventType.PartyMemberJoined);
req.EventTypes.Add(EventType.PartyMemberLeft);
try
{
_remoteEventsClient.SubscribeToEvents(req);
}
catch (Exception ex)
{
Console.WriteLine("Error subscribing to events: " + ex.Message);
}
_eventsTask.Start();
}
/// <summary>
/// Refresh members list.
/// </summary>
private void RefreshMembers()
{
MembersResponse resposne = _remotePartyClient.GetPartyMembers(new Empty());
//Add members
foreach (PartyMember member in resposne.Members)
{
_partyMembers.Add(member);
}
//Remove out of date members
foreach (PartyMember member in _partyMembers)
{
if (!resposne.Members.Contains(member))
{
_partyMembers.Remove(member);
}
}
}
/// <summary>
/// Asynchronous function for processing events off of the event stream.
/// </summary>
/// <returns></returns>
private async void GetEvents()
{
using (AsyncServerStreamingCall<BaseEvent> eventStream = _remoteEventsClient.GetEvents(new Empty()))
{
while (!_eventCancellationTokenSource.Token.IsCancellationRequested)
{
if (await eventStream.ResponseStream.MoveNext(_eventCancellationTokenSource.Token))
{
//Convert derived event type
BaseEvent e = eventStream.ResponseStream.Current;
switch (e.DerivedEventCase)
{
case BaseEvent.DerivedEventOneofCase.None:
{
throw new InvalidOperationException();
}
case BaseEvent.DerivedEventOneofCase.PartyMemberJoinedEvent:
{
PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent;
_partyMembers.Add(derivedEvent.Member);
break;
}
case BaseEvent.DerivedEventOneofCase.PartyMemberLeftEvent:
{
PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent;
var found = _partyMembers.Where(x => x.Id == derivedEvent.Member.Id);
foreach (PartyMember member in found)
{
_partyMembers.Remove(member);
}
break;
}
}
}
}
}
}
}
}

View File

@ -1,73 +0,0 @@
using System;
using System.Threading.Tasks;
using System.Collections.ObjectModel;
using Aurora.Models;
using Aurora.Executors;
using Aurora.Services;
using Aurora.Proto.Party;
using Aurora.Proto.Playback;
using Aurora.Proto.Events;
using Aurora.RemoteImpl;
namespace Aurora.Executors
{
public class HostExecutor : BaseExecutor
{
RemotePartyServiceImpl _remotePartyServiceImpl;
RemotePlaybackServiceImpl _remotePlaybackImpl;
RemoteEventServiceImpl _remoteEventImpl;
public HostExecutor()
{
_remotePartyServiceImpl = new RemotePartyServiceImpl();
_remotePlaybackImpl = new RemotePlaybackServiceImpl();
_remoteEventImpl = new RemoteEventServiceImpl();
}
public override void Connect(string hostname)
{
//Initialize gRPC server
ServerService.Instance.Initialize(hostname);
//Register grpc RemoteService with singleton server service
ServerService.Instance.RegisterService(RemotePartyService.BindService(_remotePartyServiceImpl));
ServerService.Instance.RegisterService(RemotePlaybackService.BindService(_remotePlaybackImpl));
ServerService.Instance.RegisterService(RemoteEventService.BindService(_remoteEventImpl));
//start gRPC server
ServerService.Instance.Start();
}
#region Properties
public override ObservableCollection<PartyMember> PartyMembers
{
get { return _remotePartyServiceImpl.PartyMembers; }
}
#endregion Properties
public override async void Close()
{
await ServerService.Instance.Stop();
}
public override void AddToQueue()
{
throw new NotImplementedException();
}
public override ObservableCollection<PartyMember> GetMembers()
{
throw new NotImplementedException();
}
public override void GetQueue()
{
throw new NotImplementedException();
}
public override void RemoveFromQueue()
{
throw new NotImplementedException();
}
}
}

View File

@ -1,21 +1,45 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Net;
using System.Net.Sockets;
using Grpc.Core; using Grpc.Core;
using Aurora.Proto; using Aurora.RemoteImpl;
using Aurora.Proto.Events;
using Aurora.Proto.Party;
using Aurora.Proto.Playback;
namespace Aurora.Services namespace Aurora.Services
{ {
public class ServerService : BaseService<ServerService> public class ServerService : BaseService<ServerService>
{ {
private string _hostname = "127.0.0.1";
private int _port = SettingsService.Instance.DefaultPort; private int _port = SettingsService.Instance.DefaultPort;
private string _hostname;
private Grpc.Core.Server _server; private Grpc.Core.Server _server;
//Implementation class declarations
RemotePartyServiceImpl _remotePartyServiceImpl;
RemotePlaybackServiceImpl _remotePlaybackImpl;
RemoteEventServiceImpl _remoteEventImpl;
/// <summary> /// <summary>
/// Constructor. Registers GRPC service implementations. /// Constructor. Registers GRPC service implementations.
/// </summary> /// </summary>
public ServerService() public ServerService()
{ {
string host = GetLocalIPAddress();
if (string.IsNullOrWhiteSpace(host))
{
throw new Exception("This device must have a valid IP address");
}
_hostname = host;
_server = new Grpc.Core.Server
{
Ports = { new ServerPort(_hostname, _port, ServerCredentials.Insecure) }
};
} }
public int Port public int Port
@ -28,14 +52,17 @@ namespace Aurora.Services
get { return _hostname; } get { return _hostname; }
} }
public void Initialize(string hostname) public bool Initialized
{ {
this._hostname = hostname; get
_server = new Grpc.Core.Server
{ {
Ports = { new ServerPort(_hostname, _port, ServerCredentials.Insecure) } return (_remoteEventImpl != null &&
}; _remotePartyServiceImpl != null &&
_remotePlaybackImpl != null &&
_server != null);
} }
}
/// <summary> /// <summary>
/// Start Server /// Start Server
@ -44,7 +71,22 @@ namespace Aurora.Services
{ {
try try
{ {
Console.WriteLine(string.Format("Starting gRPC server at hostname: {0}, port: {1}", _hostname, _port)); Console.WriteLine(string.Format("Starting gRPC server at hostname: {0}, port: {1}", _hostname, _port));
if (!Initialized)
{
//Construct implementations
_remotePartyServiceImpl = new RemotePartyServiceImpl();
_remotePlaybackImpl = new RemotePlaybackServiceImpl();
_remoteEventImpl = new RemoteEventServiceImpl();
// Register grpc RemoteService with singleton server service
RegisterService(RemotePartyService.BindService(_remotePartyServiceImpl));
RegisterService(RemotePlaybackService.BindService(_remotePlaybackImpl));
RegisterService(RemoteEventService.BindService(_remoteEventImpl));
}
_server.Start(); _server.Start();
} }
catch (Exception ex) catch (Exception ex)
@ -71,9 +113,23 @@ namespace Aurora.Services
}; };
} }
public void RegisterService(ServerServiceDefinition definition) private void RegisterService(ServerServiceDefinition definition)
{ {
_server.Services.Add(definition); _server.Services.Add(definition);
} }
public static string GetLocalIPAddress()
{
string returnIp = "";
var host = Dns.GetHostEntry(Dns.GetHostName());
foreach (var ip in host.AddressList)
{
if (ip.AddressFamily == AddressFamily.InterNetwork)
{
returnIp = ip.ToString();
}
}
return returnIp;
}
} }
} }