From 1acc383e902aff8a68ddfd9231a1fb0be0943185 Mon Sep 17 00:00:00 2001 From: watsonb8 Date: Mon, 11 Nov 2019 15:10:08 -0500 Subject: [PATCH] First pass at sync working. Need to ignore for special cases --- Aurora/Aurora.csproj | 4 ++ Aurora/Design/Views/Party/PartyViewModel.cs | 3 +- Aurora/Models/Media/RemoteAudio.cs | 33 +++++---- Aurora/Proto/party.proto | 5 -- Aurora/Proto/playback.proto | 13 ++++ Aurora/Proto/sync.proto | 15 ++++ Aurora/RemoteImpl/RemoteEventImpl.cs | 4 +- Aurora/RemoteImpl/RemotePartyImpl.cs | 19 ----- Aurora/RemoteImpl/RemotePlaybackImpl.cs | 33 +++++++++ Aurora/RemoteImpl/RemoteSyncImpl.cs | 48 +++++++++++++ .../Services/ClientService/ClientService.cs | 20 +++++- .../Services/PlayerService/PlayerService.cs | 62 +++++++++++++++++ Aurora/Services/ServerService.cs | 12 +++- Aurora/Utils/TimeUtils.cs | 69 +++++++++++++++++++ 14 files changed, 296 insertions(+), 44 deletions(-) create mode 100644 Aurora/Proto/playback.proto create mode 100644 Aurora/Proto/sync.proto create mode 100644 Aurora/RemoteImpl/RemotePlaybackImpl.cs create mode 100644 Aurora/RemoteImpl/RemoteSyncImpl.cs create mode 100644 Aurora/Utils/TimeUtils.cs diff --git a/Aurora/Aurora.csproj b/Aurora/Aurora.csproj index bea94d1..8c29adf 100644 --- a/Aurora/Aurora.csproj +++ b/Aurora/Aurora.csproj @@ -96,5 +96,9 @@ Include="Proto\party.proto"/> + + \ No newline at end of file diff --git a/Aurora/Design/Views/Party/PartyViewModel.cs b/Aurora/Design/Views/Party/PartyViewModel.cs index 6d831d9..d6359a8 100644 --- a/Aurora/Design/Views/Party/PartyViewModel.cs +++ b/Aurora/Design/Views/Party/PartyViewModel.cs @@ -384,7 +384,8 @@ namespace Aurora.Design.Views.Party RemoteAudio remote = new RemoteAudio(data.Id, meta, - _client.RemotePartyClient); + _client.RemotePlaybackClient, + _client.RemoteSyncClient); Queue.Add(remote); } diff --git a/Aurora/Models/Media/RemoteAudio.cs b/Aurora/Models/Media/RemoteAudio.cs index f3dfb53..6b2ce4e 100644 --- a/Aurora/Models/Media/RemoteAudio.cs +++ b/Aurora/Models/Media/RemoteAudio.cs @@ -2,29 +2,26 @@ using System; using System.IO; using System.Threading; using System.Threading.Tasks; -using Aurora.Proto.Party; using Aurora.Proto.General; +using Aurora.Proto.Playback; +using Aurora.Proto.Sync; namespace Aurora.Models.Media { public class RemoteAudio : BaseMedia { - private RemotePartyService.RemotePartyServiceClient _client; + private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient; + private RemoteSyncService.RemoteSyncServiceClient _remoteSyncClient; private CancellationTokenSource _cancellationTokenSource; #region Constructor - public RemoteAudio(string id, RemotePartyService.RemotePartyServiceClient client) + public RemoteAudio(string id, AudioMetadata metadata, + RemotePlaybackService.RemotePlaybackServiceClient playbackClient, + RemoteSyncService.RemoteSyncServiceClient syncClient) { this.Id = id; - this._client = client; - - _cancellationTokenSource = new CancellationTokenSource(); - } - - public RemoteAudio(string id, AudioMetadata metadata, RemotePartyService.RemotePartyServiceClient client) - { - this.Id = id; - this._client = client; + this._remotePlaybackClient = playbackClient; + this._remoteSyncClient = syncClient; this.Metadata = metadata; _cancellationTokenSource = new CancellationTokenSource(); @@ -41,6 +38,14 @@ namespace Aurora.Models.Media get { return MediaTypeEnum.Audio; } } + public RemoteSyncService.RemoteSyncServiceClient RemoteSyncClient + { + get + { + return _remoteSyncClient; + } + } + #endregion Properties /// @@ -49,8 +54,7 @@ namespace Aurora.Models.Media public override async Task Load() { this.DataStream = new MemoryStream(); - - using (var call = _client.GetSongStream(new SongRequest() { Id = this.Id })) + using (var call = _remotePlaybackClient.GetSongStream(new SongRequest() { Id = this.Id })) { while (await call.ResponseStream.MoveNext(_cancellationTokenSource.Token)) { @@ -58,7 +62,6 @@ namespace Aurora.Models.Media byte[] buffer = chunk.Content.ToByteArray(); await this.DataStream.WriteAsync(buffer, 0, buffer.Length); - Console.WriteLine(string.Format("Wrote byte chunk of size {0} to output stream", buffer.Length)); } Console.WriteLine("Done receiving stream"); diff --git a/Aurora/Proto/party.proto b/Aurora/Proto/party.proto index 924aec5..d5e54a4 100644 --- a/Aurora/Proto/party.proto +++ b/Aurora/Proto/party.proto @@ -10,7 +10,6 @@ service RemotePartyService { rpc LeaveParty(LeavePartyRequest) returns (LeavePartyResponse); rpc GetPartyMembers(Aurora.Proto.General.Empty) returns (MembersResponse); rpc GetQueue(Aurora.Proto.General.Empty) returns (QueueResponse); - rpc GetSongStream(SongRequest) returns (stream Aurora.Proto.General.Chunk) {}; } message JoinPartyRequest { @@ -56,7 +55,3 @@ message RemoteMediaData { string album = 4; string duration = 5; } - -message SongRequest { - string id = 1; -} diff --git a/Aurora/Proto/playback.proto b/Aurora/Proto/playback.proto new file mode 100644 index 0000000..0f6832a --- /dev/null +++ b/Aurora/Proto/playback.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package Aurora.Proto.Playback; + +import "Proto/general.proto"; + +service RemotePlaybackService { + rpc GetSongStream(SongRequest) returns (stream Aurora.Proto.General.Chunk) {}; +} + +message SongRequest { + string id = 1; +} \ No newline at end of file diff --git a/Aurora/Proto/sync.proto b/Aurora/Proto/sync.proto new file mode 100644 index 0000000..ef31739 --- /dev/null +++ b/Aurora/Proto/sync.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package Aurora.Proto.Sync; + +import "Proto/general.proto"; +import "google/protobuf/timestamp.proto"; + +service RemoteSyncService { + rpc GetMediaSync(Aurora.Proto.General.Empty) returns (stream Sync) {}; +} + +message Sync { + int64 serverTime = 1; + float trackTime = 2; +} \ No newline at end of file diff --git a/Aurora/RemoteImpl/RemoteEventImpl.cs b/Aurora/RemoteImpl/RemoteEventImpl.cs index 7183c33..6350f97 100644 --- a/Aurora/RemoteImpl/RemoteEventImpl.cs +++ b/Aurora/RemoteImpl/RemoteEventImpl.cs @@ -23,7 +23,9 @@ namespace Aurora.RemoteImpl /// The response stream /// gRPC client context /// - public async override Task GetEvents(EventsRequest request, Grpc.Core.IServerStreamWriter responseStream, Grpc.Core.ServerCallContext context) + public async override Task GetEvents(EventsRequest request, + Grpc.Core.IServerStreamWriter responseStream, + Grpc.Core.ServerCallContext context) { string peerId = Combine(new string[] { context.Peer, request.ClientId }); Console.WriteLine(string.Format("SERVER - Events request received from peer: {0}", peerId)); diff --git a/Aurora/RemoteImpl/RemotePartyImpl.cs b/Aurora/RemoteImpl/RemotePartyImpl.cs index 14b53e0..ddbca26 100644 --- a/Aurora/RemoteImpl/RemotePartyImpl.cs +++ b/Aurora/RemoteImpl/RemotePartyImpl.cs @@ -135,24 +135,5 @@ namespace Aurora.RemoteImpl return Task.FromResult(mediaList); } - - public override async Task GetSongStream(SongRequest request, - Grpc.Core.IServerStreamWriter responseStream, - Grpc.Core.ServerCallContext context) - { - BaseMedia song = LibraryService.Instance.GetSong(request.Id); - await song.Load(); - - //Send stream - Console.WriteLine("Begin sending file"); - byte[] buffer = new byte[2048]; // read in chunks of 2KB - int bytesRead; - while ((bytesRead = song.DataStream.Read(buffer, 0, buffer.Length)) > 0) - { - Google.Protobuf.ByteString bufferByteString = Google.Protobuf.ByteString.CopyFrom(buffer); - await responseStream.WriteAsync(new Chunk { Content = bufferByteString }); - } - Console.WriteLine("Done sending file"); - } } } \ No newline at end of file diff --git a/Aurora/RemoteImpl/RemotePlaybackImpl.cs b/Aurora/RemoteImpl/RemotePlaybackImpl.cs new file mode 100644 index 0000000..63d0fa0 --- /dev/null +++ b/Aurora/RemoteImpl/RemotePlaybackImpl.cs @@ -0,0 +1,33 @@ +using System; +using System.Threading.Tasks; +using System.Threading; +using System.Collections.Generic; +using Aurora.Services; +using Aurora.Proto.Playback; +using Aurora.Proto.General; +using Aurora.Models.Media; + +namespace Aurora.RemoteImpl +{ + public class RemotePlaybackServiceImpl : RemotePlaybackService.RemotePlaybackServiceBase + { + public override async Task GetSongStream(SongRequest request, + Grpc.Core.IServerStreamWriter responseStream, + Grpc.Core.ServerCallContext context) + { + BaseMedia song = LibraryService.Instance.GetSong(request.Id); + await song.Load(); + + //Send stream + Console.WriteLine("Begin sending file"); + byte[] buffer = new byte[2048]; // read in chunks of 2KB + int bytesRead; + while ((bytesRead = song.DataStream.Read(buffer, 0, buffer.Length)) > 0) + { + Google.Protobuf.ByteString bufferByteString = Google.Protobuf.ByteString.CopyFrom(buffer); + await responseStream.WriteAsync(new Chunk { Content = bufferByteString }); + } + Console.WriteLine("Done sending file"); + } + } +} \ No newline at end of file diff --git a/Aurora/RemoteImpl/RemoteSyncImpl.cs b/Aurora/RemoteImpl/RemoteSyncImpl.cs new file mode 100644 index 0000000..0a8f7d9 --- /dev/null +++ b/Aurora/RemoteImpl/RemoteSyncImpl.cs @@ -0,0 +1,48 @@ +using System; +using System.Threading.Tasks; +using Aurora.Proto.Sync; +using Aurora.Proto.General; +using Aurora.Services.PlayerService; + +namespace Aurora.RemoteImpl +{ + public class RemoteSyncServiceImpl : RemoteSyncService.RemoteSyncServiceBase + { + /// + /// RPC for getting a stream of media syncs + /// + /// + /// + /// + /// + public override async Task GetMediaSync(Empty request, + Grpc.Core.IServerStreamWriter responseStream, + Grpc.Core.ServerCallContext context) + { + bool songIsPlaying = true; + PlaybackStateChangedEventHandler playbackStateChanged = (sender, e) => + { + songIsPlaying = false; + }; + + PlayerService.Instance.PlaybackStateChanged += playbackStateChanged; + + while (songIsPlaying) + { + DateTime time = Utils.TimeUtils.GetNetworkTime(); + float position = PlayerService.Instance.CurrentMediaTime; + float length = PlayerService.Instance.CurrentMediaLength; + + float trackTime = length * position; + + Sync sync = new Sync() + { + TrackTime = trackTime, + ServerTime = time.Ticks + }; + await responseStream.WriteAsync(sync); + await Task.Delay(10000); + } + } + } +} \ No newline at end of file diff --git a/Aurora/Services/ClientService/ClientService.cs b/Aurora/Services/ClientService/ClientService.cs index 1dd04b4..f64aeaa 100644 --- a/Aurora/Services/ClientService/ClientService.cs +++ b/Aurora/Services/ClientService/ClientService.cs @@ -4,7 +4,8 @@ using System.Threading; using Grpc.Core; using Aurora.Proto.Events; using Aurora.Proto.Party; -using Aurora.Services.ClientService; +using Aurora.Proto.Playback; +using Aurora.Proto.Sync; namespace Aurora.Services.ClientService { @@ -12,6 +13,9 @@ namespace Aurora.Services.ClientService { private RemotePartyService.RemotePartyServiceClient _remotePartyClient; private RemoteEventService.RemoteEventServiceClient _remoteEventsClient; + private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient; + private RemoteSyncService.RemoteSyncServiceClient _remoteSyncClient; + private Channel _channel; CancellationTokenSource _eventCancellationTokenSource; @@ -34,6 +38,16 @@ namespace Aurora.Services.ClientService get { return _remoteEventsClient; } } + public RemotePlaybackService.RemotePlaybackServiceClient RemotePlaybackClient + { + get { return _remotePlaybackClient; } + } + + public RemoteSyncService.RemoteSyncServiceClient RemoteSyncClient + { + get { return _remoteSyncClient; } + } + public bool IsStarted { get @@ -51,6 +65,8 @@ namespace Aurora.Services.ClientService _remotePartyClient = new RemotePartyService.RemotePartyServiceClient(_channel); _remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel); + _remotePlaybackClient = new RemotePlaybackService.RemotePlaybackServiceClient(_channel); + _remoteSyncClient = new RemoteSyncService.RemoteSyncServiceClient(_channel); //Assign but don't start task _eventCancellationTokenSource = new CancellationTokenSource(); @@ -63,6 +79,8 @@ namespace Aurora.Services.ClientService _remotePartyClient = null; _remoteEventsClient = null; + _remotePlaybackClient = null; + _remoteSyncClient = null; } /// diff --git a/Aurora/Services/PlayerService/PlayerService.cs b/Aurora/Services/PlayerService/PlayerService.cs index 2241556..6e12539 100644 --- a/Aurora/Services/PlayerService/PlayerService.cs +++ b/Aurora/Services/PlayerService/PlayerService.cs @@ -1,6 +1,9 @@ using System; using System.Threading.Tasks; +using System.Threading; +using Grpc.Core; using Aurora.Models.Media; +using Aurora.Proto.Sync; using LibVLCSharp.Shared; namespace Aurora.Services.PlayerService @@ -47,6 +50,22 @@ namespace Aurora.Services.PlayerService return _currentMedia == media; } + public float CurrentMediaTime + { + get + { + return _mediaPlayer.Position; + } + } + + public long CurrentMediaLength + { + get + { + return _mediaPlayer.Length; + } + } + /// /// Load media into the media player. /// @@ -78,7 +97,50 @@ namespace Aurora.Services.PlayerService { PlaybackState oldState = _state; _state = PlaybackState.Playing; + _mediaPlayer.Play(); + //Use sync RPC for remote audio + if (_currentMedia is RemoteAudio) + { + RemoteAudio media = _currentMedia as RemoteAudio; + RemoteSyncService.RemoteSyncServiceClient _remoteSyncClient = media.RemoteSyncClient; + + //Sync playback in a separate task + //Task completes when host stops syncing (when a song is complete) + Task syncTask = new Task(async () => + { + CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); + using (AsyncServerStreamingCall syncStream = _remoteSyncClient + .GetMediaSync(new Proto.General.Empty())) + { + try + { + while (await syncStream.ResponseStream.MoveNext(cancellationTokenSource.Token)) + { + Sync sync = new Sync(syncStream.ResponseStream.Current); + if (sync != null) + { + //Adjust position based on sync + DateTime localTime = Utils.TimeUtils.GetNetworkTime(); + //Get offset converted to milliseconds + float offset = ((localTime.Ticks - sync.ServerTime) * 100) / (1000 * 1000); + + float length = CurrentMediaLength; + float position = (sync.TrackTime + offset) / length; + + _mediaPlayer.Position = position; + } + } + } + catch (Exception ex) + { + Console.WriteLine("Exception caught while attempting to sync: " + ex.Message); + } + } + }); + + syncTask.Start(); + } if (PlaybackStateChanged != null) { diff --git a/Aurora/Services/ServerService.cs b/Aurora/Services/ServerService.cs index 6cee1e8..3205ea1 100644 --- a/Aurora/Services/ServerService.cs +++ b/Aurora/Services/ServerService.cs @@ -6,6 +6,8 @@ using Grpc.Core; using Aurora.RemoteImpl; using Aurora.Proto.Events; using Aurora.Proto.Party; +using Aurora.Proto.Playback; +using Aurora.Proto.Sync; namespace Aurora.Services @@ -17,8 +19,10 @@ namespace Aurora.Services private Grpc.Core.Server _server; //Implementation class declarations - RemotePartyServiceImpl _remotePartyServiceImpl; - RemoteEventServiceImpl _remoteEventImpl; + private RemotePartyServiceImpl _remotePartyServiceImpl; + private RemoteEventServiceImpl _remoteEventImpl; + private RemotePlaybackServiceImpl _remotePlaybackImpl; + private RemoteSyncServiceImpl _remoteSyncImpl; /// /// Constructor. Registers GRPC service implementations. @@ -76,10 +80,14 @@ namespace Aurora.Services //Construct implementations _remotePartyServiceImpl = new RemotePartyServiceImpl(); _remoteEventImpl = new RemoteEventServiceImpl(); + _remotePlaybackImpl = new RemotePlaybackServiceImpl(); + _remoteSyncImpl = new RemoteSyncServiceImpl(); // Register grpc RemoteService with singleton server service RegisterService(RemotePartyService.BindService(_remotePartyServiceImpl)); RegisterService(RemoteEventService.BindService(_remoteEventImpl)); + RegisterService(RemotePlaybackService.BindService(_remotePlaybackImpl)); + RegisterService(RemoteSyncService.BindService(_remoteSyncImpl)); } _server.Start(); } diff --git a/Aurora/Utils/TimeUtils.cs b/Aurora/Utils/TimeUtils.cs new file mode 100644 index 0000000..720e9a7 --- /dev/null +++ b/Aurora/Utils/TimeUtils.cs @@ -0,0 +1,69 @@ +using System; +using System.Net; +using System.Net.Sockets; + +namespace Aurora.Utils +{ + public static class TimeUtils + { + public static DateTime GetNetworkTime() + { + //default Windows time server + const string ntpServer = "time.windows.com"; + + // NTP message size - 16 bytes of the digest (RFC 2030) + var ntpData = new byte[48]; + + //Setting the Leap Indicator, Version Number and Mode values + ntpData[0] = 0x1B; //LI = 0 (no warning), VN = 3 (IPv4 only), Mode = 3 (Client Mode) + + var addresses = Dns.GetHostEntry(ntpServer).AddressList; + + //The UDP port number assigned to NTP is 123 + var ipEndPoint = new IPEndPoint(addresses[0], 123); + //NTP uses UDP + + using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp)) + { + socket.Connect(ipEndPoint); + + //Stops code hang if NTP is blocked + socket.ReceiveTimeout = 3000; + + socket.Send(ntpData); + socket.Receive(ntpData); + socket.Close(); + } + + //Offset to get to the "Transmit Timestamp" field (time at which the reply + //departed the server for the client, in 64-bit timestamp format." + const byte serverReplyTime = 40; + + //Get the seconds part + ulong intPart = BitConverter.ToUInt32(ntpData, serverReplyTime); + + //Get the seconds fraction + ulong fractPart = BitConverter.ToUInt32(ntpData, serverReplyTime + 4); + + //Convert From big-endian to little-endian + intPart = SwapEndianness(intPart); + fractPart = SwapEndianness(fractPart); + + var milliseconds = (intPart * 1000) + ((fractPart * 1000) / 0x100000000L); + + //**UTC** time + var networkDateTime = (new DateTime(1900, 1, 1, 0, 0, 0, DateTimeKind.Utc)).AddMilliseconds((long)milliseconds); + + return networkDateTime.ToLocalTime(); + } + + // stackoverflow.com/a/3294698/162671 + static uint SwapEndianness(ulong x) + { + return (uint)(((x & 0x000000ff) << 24) + + ((x & 0x0000ff00) << 8) + + ((x & 0x00ff0000) >> 8) + + ((x & 0xff000000) >> 24)); + } + } +} \ No newline at end of file