First pass at sync working. Need to ignore for special cases

This commit is contained in:
watsonb8 2019-11-11 15:10:08 -05:00
parent 5f035e9bcb
commit 1acc383e90
14 changed files with 296 additions and 44 deletions

View File

@ -96,5 +96,9 @@
Include="Proto\party.proto"/> Include="Proto\party.proto"/>
<Protobuf <Protobuf
Include="Proto\events.proto"/> Include="Proto\events.proto"/>
<Protobuf
Include="Proto\playback.proto"/>
<Protobuf
Include="Proto\sync.proto"/>
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -384,7 +384,8 @@ namespace Aurora.Design.Views.Party
RemoteAudio remote = new RemoteAudio(data.Id, RemoteAudio remote = new RemoteAudio(data.Id,
meta, meta,
_client.RemotePartyClient); _client.RemotePlaybackClient,
_client.RemoteSyncClient);
Queue.Add(remote); Queue.Add(remote);
} }

View File

@ -2,29 +2,26 @@ using System;
using System.IO; using System.IO;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Aurora.Proto.Party;
using Aurora.Proto.General; using Aurora.Proto.General;
using Aurora.Proto.Playback;
using Aurora.Proto.Sync;
namespace Aurora.Models.Media namespace Aurora.Models.Media
{ {
public class RemoteAudio : BaseMedia public class RemoteAudio : BaseMedia
{ {
private RemotePartyService.RemotePartyServiceClient _client; private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient;
private RemoteSyncService.RemoteSyncServiceClient _remoteSyncClient;
private CancellationTokenSource _cancellationTokenSource; private CancellationTokenSource _cancellationTokenSource;
#region Constructor #region Constructor
public RemoteAudio(string id, RemotePartyService.RemotePartyServiceClient client) public RemoteAudio(string id, AudioMetadata metadata,
RemotePlaybackService.RemotePlaybackServiceClient playbackClient,
RemoteSyncService.RemoteSyncServiceClient syncClient)
{ {
this.Id = id; this.Id = id;
this._client = client; this._remotePlaybackClient = playbackClient;
this._remoteSyncClient = syncClient;
_cancellationTokenSource = new CancellationTokenSource();
}
public RemoteAudio(string id, AudioMetadata metadata, RemotePartyService.RemotePartyServiceClient client)
{
this.Id = id;
this._client = client;
this.Metadata = metadata; this.Metadata = metadata;
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();
@ -41,6 +38,14 @@ namespace Aurora.Models.Media
get { return MediaTypeEnum.Audio; } get { return MediaTypeEnum.Audio; }
} }
public RemoteSyncService.RemoteSyncServiceClient RemoteSyncClient
{
get
{
return _remoteSyncClient;
}
}
#endregion Properties #endregion Properties
/// <summary> /// <summary>
@ -49,8 +54,7 @@ namespace Aurora.Models.Media
public override async Task Load() public override async Task Load()
{ {
this.DataStream = new MemoryStream(); this.DataStream = new MemoryStream();
using (var call = _remotePlaybackClient.GetSongStream(new SongRequest() { Id = this.Id }))
using (var call = _client.GetSongStream(new SongRequest() { Id = this.Id }))
{ {
while (await call.ResponseStream.MoveNext(_cancellationTokenSource.Token)) while (await call.ResponseStream.MoveNext(_cancellationTokenSource.Token))
{ {
@ -58,7 +62,6 @@ namespace Aurora.Models.Media
byte[] buffer = chunk.Content.ToByteArray(); byte[] buffer = chunk.Content.ToByteArray();
await this.DataStream.WriteAsync(buffer, 0, buffer.Length); await this.DataStream.WriteAsync(buffer, 0, buffer.Length);
Console.WriteLine(string.Format("Wrote byte chunk of size {0} to output stream", buffer.Length));
} }
Console.WriteLine("Done receiving stream"); Console.WriteLine("Done receiving stream");

View File

@ -10,7 +10,6 @@ service RemotePartyService {
rpc LeaveParty(LeavePartyRequest) returns (LeavePartyResponse); rpc LeaveParty(LeavePartyRequest) returns (LeavePartyResponse);
rpc GetPartyMembers(Aurora.Proto.General.Empty) returns (MembersResponse); rpc GetPartyMembers(Aurora.Proto.General.Empty) returns (MembersResponse);
rpc GetQueue(Aurora.Proto.General.Empty) returns (QueueResponse); rpc GetQueue(Aurora.Proto.General.Empty) returns (QueueResponse);
rpc GetSongStream(SongRequest) returns (stream Aurora.Proto.General.Chunk) {};
} }
message JoinPartyRequest { message JoinPartyRequest {
@ -56,7 +55,3 @@ message RemoteMediaData {
string album = 4; string album = 4;
string duration = 5; string duration = 5;
} }
message SongRequest {
string id = 1;
}

View File

@ -0,0 +1,13 @@
syntax = "proto3";
package Aurora.Proto.Playback;
import "Proto/general.proto";
service RemotePlaybackService {
rpc GetSongStream(SongRequest) returns (stream Aurora.Proto.General.Chunk) {};
}
message SongRequest {
string id = 1;
}

15
Aurora/Proto/sync.proto Normal file
View File

@ -0,0 +1,15 @@
syntax = "proto3";
package Aurora.Proto.Sync;
import "Proto/general.proto";
import "google/protobuf/timestamp.proto";
service RemoteSyncService {
rpc GetMediaSync(Aurora.Proto.General.Empty) returns (stream Sync) {};
}
message Sync {
int64 serverTime = 1;
float trackTime = 2;
}

View File

@ -23,7 +23,9 @@ namespace Aurora.RemoteImpl
/// <param name="responseStream">The response stream</param> /// <param name="responseStream">The response stream</param>
/// <param name="context">gRPC client context</param> /// <param name="context">gRPC client context</param>
/// <returns></returns> /// <returns></returns>
public async override Task GetEvents(EventsRequest request, Grpc.Core.IServerStreamWriter<BaseEvent> responseStream, Grpc.Core.ServerCallContext context) public async override Task GetEvents(EventsRequest request,
Grpc.Core.IServerStreamWriter<BaseEvent> responseStream,
Grpc.Core.ServerCallContext context)
{ {
string peerId = Combine(new string[] { context.Peer, request.ClientId }); string peerId = Combine(new string[] { context.Peer, request.ClientId });
Console.WriteLine(string.Format("SERVER - Events request received from peer: {0}", peerId)); Console.WriteLine(string.Format("SERVER - Events request received from peer: {0}", peerId));

View File

@ -135,24 +135,5 @@ namespace Aurora.RemoteImpl
return Task.FromResult(mediaList); return Task.FromResult(mediaList);
} }
public override async Task GetSongStream(SongRequest request,
Grpc.Core.IServerStreamWriter<Chunk> responseStream,
Grpc.Core.ServerCallContext context)
{
BaseMedia song = LibraryService.Instance.GetSong(request.Id);
await song.Load();
//Send stream
Console.WriteLine("Begin sending file");
byte[] buffer = new byte[2048]; // read in chunks of 2KB
int bytesRead;
while ((bytesRead = song.DataStream.Read(buffer, 0, buffer.Length)) > 0)
{
Google.Protobuf.ByteString bufferByteString = Google.Protobuf.ByteString.CopyFrom(buffer);
await responseStream.WriteAsync(new Chunk { Content = bufferByteString });
}
Console.WriteLine("Done sending file");
}
} }
} }

View File

@ -0,0 +1,33 @@
using System;
using System.Threading.Tasks;
using System.Threading;
using System.Collections.Generic;
using Aurora.Services;
using Aurora.Proto.Playback;
using Aurora.Proto.General;
using Aurora.Models.Media;
namespace Aurora.RemoteImpl
{
public class RemotePlaybackServiceImpl : RemotePlaybackService.RemotePlaybackServiceBase
{
public override async Task GetSongStream(SongRequest request,
Grpc.Core.IServerStreamWriter<Chunk> responseStream,
Grpc.Core.ServerCallContext context)
{
BaseMedia song = LibraryService.Instance.GetSong(request.Id);
await song.Load();
//Send stream
Console.WriteLine("Begin sending file");
byte[] buffer = new byte[2048]; // read in chunks of 2KB
int bytesRead;
while ((bytesRead = song.DataStream.Read(buffer, 0, buffer.Length)) > 0)
{
Google.Protobuf.ByteString bufferByteString = Google.Protobuf.ByteString.CopyFrom(buffer);
await responseStream.WriteAsync(new Chunk { Content = bufferByteString });
}
Console.WriteLine("Done sending file");
}
}
}

View File

@ -0,0 +1,48 @@
using System;
using System.Threading.Tasks;
using Aurora.Proto.Sync;
using Aurora.Proto.General;
using Aurora.Services.PlayerService;
namespace Aurora.RemoteImpl
{
public class RemoteSyncServiceImpl : RemoteSyncService.RemoteSyncServiceBase
{
/// <summary>
/// RPC for getting a stream of media syncs
/// </summary>
/// <param name="request"></param>
/// <param name="responseStream"></param>
/// <param name="context"></param>
/// <returns></returns>
public override async Task GetMediaSync(Empty request,
Grpc.Core.IServerStreamWriter<Sync> responseStream,
Grpc.Core.ServerCallContext context)
{
bool songIsPlaying = true;
PlaybackStateChangedEventHandler playbackStateChanged = (sender, e) =>
{
songIsPlaying = false;
};
PlayerService.Instance.PlaybackStateChanged += playbackStateChanged;
while (songIsPlaying)
{
DateTime time = Utils.TimeUtils.GetNetworkTime();
float position = PlayerService.Instance.CurrentMediaTime;
float length = PlayerService.Instance.CurrentMediaLength;
float trackTime = length * position;
Sync sync = new Sync()
{
TrackTime = trackTime,
ServerTime = time.Ticks
};
await responseStream.WriteAsync(sync);
await Task.Delay(10000);
}
}
}
}

View File

@ -4,7 +4,8 @@ using System.Threading;
using Grpc.Core; using Grpc.Core;
using Aurora.Proto.Events; using Aurora.Proto.Events;
using Aurora.Proto.Party; using Aurora.Proto.Party;
using Aurora.Services.ClientService; using Aurora.Proto.Playback;
using Aurora.Proto.Sync;
namespace Aurora.Services.ClientService namespace Aurora.Services.ClientService
{ {
@ -12,6 +13,9 @@ namespace Aurora.Services.ClientService
{ {
private RemotePartyService.RemotePartyServiceClient _remotePartyClient; private RemotePartyService.RemotePartyServiceClient _remotePartyClient;
private RemoteEventService.RemoteEventServiceClient _remoteEventsClient; private RemoteEventService.RemoteEventServiceClient _remoteEventsClient;
private RemotePlaybackService.RemotePlaybackServiceClient _remotePlaybackClient;
private RemoteSyncService.RemoteSyncServiceClient _remoteSyncClient;
private Channel _channel; private Channel _channel;
CancellationTokenSource _eventCancellationTokenSource; CancellationTokenSource _eventCancellationTokenSource;
@ -34,6 +38,16 @@ namespace Aurora.Services.ClientService
get { return _remoteEventsClient; } get { return _remoteEventsClient; }
} }
public RemotePlaybackService.RemotePlaybackServiceClient RemotePlaybackClient
{
get { return _remotePlaybackClient; }
}
public RemoteSyncService.RemoteSyncServiceClient RemoteSyncClient
{
get { return _remoteSyncClient; }
}
public bool IsStarted public bool IsStarted
{ {
get get
@ -51,6 +65,8 @@ namespace Aurora.Services.ClientService
_remotePartyClient = new RemotePartyService.RemotePartyServiceClient(_channel); _remotePartyClient = new RemotePartyService.RemotePartyServiceClient(_channel);
_remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel); _remoteEventsClient = new RemoteEventService.RemoteEventServiceClient(_channel);
_remotePlaybackClient = new RemotePlaybackService.RemotePlaybackServiceClient(_channel);
_remoteSyncClient = new RemoteSyncService.RemoteSyncServiceClient(_channel);
//Assign but don't start task //Assign but don't start task
_eventCancellationTokenSource = new CancellationTokenSource(); _eventCancellationTokenSource = new CancellationTokenSource();
@ -63,6 +79,8 @@ namespace Aurora.Services.ClientService
_remotePartyClient = null; _remotePartyClient = null;
_remoteEventsClient = null; _remoteEventsClient = null;
_remotePlaybackClient = null;
_remoteSyncClient = null;
} }
/// <summary> /// <summary>

View File

@ -1,6 +1,9 @@
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Threading;
using Grpc.Core;
using Aurora.Models.Media; using Aurora.Models.Media;
using Aurora.Proto.Sync;
using LibVLCSharp.Shared; using LibVLCSharp.Shared;
namespace Aurora.Services.PlayerService namespace Aurora.Services.PlayerService
@ -47,6 +50,22 @@ namespace Aurora.Services.PlayerService
return _currentMedia == media; return _currentMedia == media;
} }
public float CurrentMediaTime
{
get
{
return _mediaPlayer.Position;
}
}
public long CurrentMediaLength
{
get
{
return _mediaPlayer.Length;
}
}
/// <summary> /// <summary>
/// Load media into the media player. /// Load media into the media player.
/// </summary> /// </summary>
@ -78,7 +97,50 @@ namespace Aurora.Services.PlayerService
{ {
PlaybackState oldState = _state; PlaybackState oldState = _state;
_state = PlaybackState.Playing; _state = PlaybackState.Playing;
_mediaPlayer.Play(); _mediaPlayer.Play();
//Use sync RPC for remote audio
if (_currentMedia is RemoteAudio)
{
RemoteAudio media = _currentMedia as RemoteAudio;
RemoteSyncService.RemoteSyncServiceClient _remoteSyncClient = media.RemoteSyncClient;
//Sync playback in a separate task
//Task completes when host stops syncing (when a song is complete)
Task syncTask = new Task(async () =>
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
using (AsyncServerStreamingCall<Sync> syncStream = _remoteSyncClient
.GetMediaSync(new Proto.General.Empty()))
{
try
{
while (await syncStream.ResponseStream.MoveNext(cancellationTokenSource.Token))
{
Sync sync = new Sync(syncStream.ResponseStream.Current);
if (sync != null)
{
//Adjust position based on sync
DateTime localTime = Utils.TimeUtils.GetNetworkTime();
//Get offset converted to milliseconds
float offset = ((localTime.Ticks - sync.ServerTime) * 100) / (1000 * 1000);
float length = CurrentMediaLength;
float position = (sync.TrackTime + offset) / length;
_mediaPlayer.Position = position;
}
}
}
catch (Exception ex)
{
Console.WriteLine("Exception caught while attempting to sync: " + ex.Message);
}
}
});
syncTask.Start();
}
if (PlaybackStateChanged != null) if (PlaybackStateChanged != null)
{ {

View File

@ -6,6 +6,8 @@ using Grpc.Core;
using Aurora.RemoteImpl; using Aurora.RemoteImpl;
using Aurora.Proto.Events; using Aurora.Proto.Events;
using Aurora.Proto.Party; using Aurora.Proto.Party;
using Aurora.Proto.Playback;
using Aurora.Proto.Sync;
namespace Aurora.Services namespace Aurora.Services
@ -17,8 +19,10 @@ namespace Aurora.Services
private Grpc.Core.Server _server; private Grpc.Core.Server _server;
//Implementation class declarations //Implementation class declarations
RemotePartyServiceImpl _remotePartyServiceImpl; private RemotePartyServiceImpl _remotePartyServiceImpl;
RemoteEventServiceImpl _remoteEventImpl; private RemoteEventServiceImpl _remoteEventImpl;
private RemotePlaybackServiceImpl _remotePlaybackImpl;
private RemoteSyncServiceImpl _remoteSyncImpl;
/// <summary> /// <summary>
/// Constructor. Registers GRPC service implementations. /// Constructor. Registers GRPC service implementations.
@ -76,10 +80,14 @@ namespace Aurora.Services
//Construct implementations //Construct implementations
_remotePartyServiceImpl = new RemotePartyServiceImpl(); _remotePartyServiceImpl = new RemotePartyServiceImpl();
_remoteEventImpl = new RemoteEventServiceImpl(); _remoteEventImpl = new RemoteEventServiceImpl();
_remotePlaybackImpl = new RemotePlaybackServiceImpl();
_remoteSyncImpl = new RemoteSyncServiceImpl();
// Register grpc RemoteService with singleton server service // Register grpc RemoteService with singleton server service
RegisterService(RemotePartyService.BindService(_remotePartyServiceImpl)); RegisterService(RemotePartyService.BindService(_remotePartyServiceImpl));
RegisterService(RemoteEventService.BindService(_remoteEventImpl)); RegisterService(RemoteEventService.BindService(_remoteEventImpl));
RegisterService(RemotePlaybackService.BindService(_remotePlaybackImpl));
RegisterService(RemoteSyncService.BindService(_remoteSyncImpl));
} }
_server.Start(); _server.Start();
} }

69
Aurora/Utils/TimeUtils.cs Normal file
View File

@ -0,0 +1,69 @@
using System;
using System.Net;
using System.Net.Sockets;
namespace Aurora.Utils
{
public static class TimeUtils
{
public static DateTime GetNetworkTime()
{
//default Windows time server
const string ntpServer = "time.windows.com";
// NTP message size - 16 bytes of the digest (RFC 2030)
var ntpData = new byte[48];
//Setting the Leap Indicator, Version Number and Mode values
ntpData[0] = 0x1B; //LI = 0 (no warning), VN = 3 (IPv4 only), Mode = 3 (Client Mode)
var addresses = Dns.GetHostEntry(ntpServer).AddressList;
//The UDP port number assigned to NTP is 123
var ipEndPoint = new IPEndPoint(addresses[0], 123);
//NTP uses UDP
using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp))
{
socket.Connect(ipEndPoint);
//Stops code hang if NTP is blocked
socket.ReceiveTimeout = 3000;
socket.Send(ntpData);
socket.Receive(ntpData);
socket.Close();
}
//Offset to get to the "Transmit Timestamp" field (time at which the reply
//departed the server for the client, in 64-bit timestamp format."
const byte serverReplyTime = 40;
//Get the seconds part
ulong intPart = BitConverter.ToUInt32(ntpData, serverReplyTime);
//Get the seconds fraction
ulong fractPart = BitConverter.ToUInt32(ntpData, serverReplyTime + 4);
//Convert From big-endian to little-endian
intPart = SwapEndianness(intPart);
fractPart = SwapEndianness(fractPart);
var milliseconds = (intPart * 1000) + ((fractPart * 1000) / 0x100000000L);
//**UTC** time
var networkDateTime = (new DateTime(1900, 1, 1, 0, 0, 0, DateTimeKind.Utc)).AddMilliseconds((long)milliseconds);
return networkDateTime.ToLocalTime();
}
// stackoverflow.com/a/3294698/162671
static uint SwapEndianness(ulong x)
{
return (uint)(((x & 0x000000ff) << 24) +
((x & 0x0000ff00) << 8) +
((x & 0x00ff0000) >> 8) +
((x & 0xff000000) >> 24));
}
}
}