using System; using System.Collections.ObjectModel; using System.Threading.Tasks; using System.Threading; using System.Linq.Expressions; using System.Linq; using Grpc.Core; using wellKnown = Google.Protobuf.WellKnownTypes; using Aurora.Proto.General; using Aurora.Proto.Party; using Aurora.Proto.Playback; using Aurora.Proto.Events; using Aurora.Services; namespace Aurora.Executors { public class ClientExecutor : BaseExecutor { private Channel _channel; private RemotePartyService.RemotePartyServiceClient _remotePartyClient; private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient; private RemoteEventService.RemoteEventServiceClient _remoteEventsClient; private Task _eventsTask; CancellationTokenSource _eventCancellationTokenSource; private ObservableCollection _partyMembers; public ClientExecutor() { _partyMembers = new ObservableCollection(); } #region Properties public override ObservableCollection PartyMembers { get { return _partyMembers; } } #endregion Properties /// /// Initiates the connection to a party. /// /// The hostname of the gRPC server public override void Connect(string hostname) { _channel = new Channel(string.Format("{0}:{1}", hostname, SettingsService.Instance.DefaultPort), ChannelCredentials.Insecure); _remotePartyClient = new RemotePartyService.RemotePartyServiceClient(_channel); _remotePlaybackClient = new RemotePlaybackService.RemotePlaybackServiceClient(_channel); _remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel); //Assign but don't start task _eventsTask = new Task(GetEvents); _eventCancellationTokenSource = new CancellationTokenSource(); JoinParty(); } /// /// Shutdown Connections /// /// public override async void Close() { _eventCancellationTokenSource.Cancel(); await _channel.ShutdownAsync(); } public override void AddToQueue() { throw new NotImplementedException(); } public override ObservableCollection GetMembers() { throw new NotImplementedException(); } public override void GetQueue() { throw new NotImplementedException(); } public override void RemoveFromQueue() { throw new NotImplementedException(); } /// /// Join the remote party. /// /// private async void JoinParty() { await _remotePartyClient.JoinPartyAsync(new JoinPartyRequest { UserName = SettingsService.Instance.Username, }); RefreshMembers(); //Subscribe to events SubscribeRequest req = new SubscribeRequest(); req.EventTypes.Add(EventType.PartyMemberJoined); req.EventTypes.Add(EventType.PartyMemberLeft); try { _remoteEventsClient.SubscribeToEvents(req); } catch (Exception ex) { Console.WriteLine("Error subscribing to events: " + ex.Message); } _eventsTask.Start(); } /// /// Refresh members list. /// private void RefreshMembers() { MembersResponse resposne = _remotePartyClient.GetPartyMembers(new Empty()); //Add members foreach (PartyMember member in resposne.Members) { _partyMembers.Add(member); } //Remove out of date members foreach (PartyMember member in _partyMembers) { if (!resposne.Members.Contains(member)) { _partyMembers.Remove(member); } } } /// /// Asynchronous function for processing events off of the event stream. /// /// private async void GetEvents() { using (AsyncServerStreamingCall eventStream = _remoteEventsClient.GetEvents(new Empty())) { while (!_eventCancellationTokenSource.Token.IsCancellationRequested) { if (await eventStream.ResponseStream.MoveNext(_eventCancellationTokenSource.Token)) { //Convert derived event type BaseEvent e = eventStream.ResponseStream.Current; switch (e.DerivedEventCase) { case BaseEvent.DerivedEventOneofCase.None: { throw new InvalidOperationException(); } case BaseEvent.DerivedEventOneofCase.PartyMemberJoinedEvent: { PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent; _partyMembers.Add(derivedEvent.Member); break; } case BaseEvent.DerivedEventOneofCase.PartyMemberLeftEvent: { PartyMemberJoinedEvent derivedEvent = e.PartyMemberJoinedEvent; var found = _partyMembers.Where(x => x.Id == derivedEvent.Member.Id); foreach (PartyMember member in found) { _partyMembers.Remove(member); } break; } } } } } } } }