deduplicate connection logic

This commit is contained in:
Vinzenz Schroeter 2024-05-04 14:25:37 +02:00
parent f477d1e5de
commit fa8a723ff9
9 changed files with 108 additions and 113 deletions

View file

@ -1,7 +1,8 @@
import {useEffect, useRef} from 'react'; import {useEffect, useRef, useState} from 'react';
import './ClientScreen.css'; import './ClientScreen.css';
import {hslToString, Theme} from './theme.ts'; import {hslToString, Theme} from './theme.ts';
import {makeApiUrl, useMyWebSocket} from './serverCalls.tsx'; import {makeApiUrl, useMyWebSocket} from './serverCalls.tsx';
import {ReadyState} from 'react-use-websocket';
const pixelsPerRow = 352; const pixelsPerRow = 352;
const pixelsPerCol = 160; const pixelsPerCol = 160;
@ -101,6 +102,7 @@ export default function ClientScreen({theme, player}: {
player: string | null player: string | null
}) { }) {
const canvasRef = useRef<HTMLCanvasElement>(null); const canvasRef = useRef<HTMLCanvasElement>(null);
const [shouldSendMessage, setShouldSendMessage] = useState(false);
const url = makeApiUrl('/screen', 'ws'); const url = makeApiUrl('/screen', 'ws');
if (player && player !== '') if (player && player !== '')
@ -109,13 +111,24 @@ export default function ClientScreen({theme, player}: {
const { const {
lastMessage, lastMessage,
sendMessage, sendMessage,
getWebSocket getWebSocket,
} = useMyWebSocket(url.toString(), {}); readyState
} = useMyWebSocket(url.toString(), {
onOpen: _ => setShouldSendMessage(true)
});
const socket = getWebSocket(); const socket = getWebSocket();
if (socket) if (socket)
(socket as WebSocket).binaryType = 'arraybuffer'; (socket as WebSocket).binaryType = 'arraybuffer';
useEffect(() => {
if (!shouldSendMessage || readyState !== ReadyState.OPEN)
return;
setShouldSendMessage(false);
sendMessage('');
}, [readyState, shouldSendMessage]);
useEffect(() => { useEffect(() => {
if (lastMessage === null) if (lastMessage === null)
return; return;
@ -155,7 +168,7 @@ export default function ClientScreen({theme, player}: {
if (ignore) if (ignore)
return; return;
sendMessage(''); setShouldSendMessage(true);
}; };
start(); start();

View file

@ -28,7 +28,6 @@ type TankInfo = {
readonly moving: boolean; readonly moving: boolean;
} }
type PlayerInfoMessage = { type PlayerInfoMessage = {
readonly name: string; readonly name: string;
readonly scores: Scores; readonly scores: Scores;
@ -48,7 +47,8 @@ export default function PlayerInfo({player}: { player: string }) {
readyState, readyState,
sendMessage sendMessage
} = useMyWebSocket<PlayerInfoMessage>(url.toString(), { } = useMyWebSocket<PlayerInfoMessage>(url.toString(), {
onMessage: () => setShouldSendMessage(true) onMessage: () => setShouldSendMessage(true),
onOpen: _ => setShouldSendMessage(true)
}); });
useEffect(() => { useEffect(() => {

View file

@ -19,10 +19,10 @@ export type Player = {
readonly scores: Scores; readonly scores: Scores;
}; };
export function useMyWebSocket<T = unknown>(url: string, options: Options) { export function useMyWebSocket<T = unknown>(url: string, options: Options = {}) {
return useWebSocket<T>(url, { return useWebSocket<T>(url, {
shouldReconnect: () => true, shouldReconnect: () => true,
reconnectAttempts: 5, reconnectAttempts: 2,
onReconnectStop: () => alert('server connection failed. please reload.'), onReconnectStop: () => alert('server connection failed. please reload.'),
...options ...options
}); });

View file

@ -1,21 +1,16 @@
using System.Buffers; using System.Buffers;
using System.Diagnostics;
using System.Net.WebSockets; using System.Net.WebSockets;
using DisplayCommands; using DisplayCommands;
using DotNext.Threading;
using TanksServer.Graphics; using TanksServer.Graphics;
namespace TanksServer.Interactivity; namespace TanksServer.Interactivity;
internal sealed class ClientScreenServerConnection internal sealed class ClientScreenServerConnection
: WebsocketServerConnection, IDisposable : DroppablePackageRequestConnection<ClientScreenServerConnection.Package>
{ {
private readonly BufferPool _bufferPool; private readonly BufferPool _bufferPool;
private readonly PlayerScreenData? _playerDataBuilder; private readonly PlayerScreenData? _playerDataBuilder;
private readonly Player? _player; private readonly Player? _player;
private readonly AsyncAutoResetEvent _nextPackageEvent = new(false, 1);
private int _runningMessageHandlers = 0;
private Package? _next;
public ClientScreenServerConnection( public ClientScreenServerConnection(
WebSocket webSocket, WebSocket webSocket,
@ -32,47 +27,11 @@ internal sealed class ClientScreenServerConnection
: new PlayerScreenData(logger, player); : new PlayerScreenData(logger, player);
} }
protected override ValueTask HandleMessageAsync(Memory<byte> _)
{
if (Interlocked.Increment(ref _runningMessageHandlers) == 1)
return Core();
Interlocked.Decrement(ref _runningMessageHandlers);
return ValueTask.CompletedTask;
async ValueTask Core()
{
await _nextPackageEvent.WaitAsync();
var package = Interlocked.Exchange(ref _next, null);
if (package == null)
throw new UnreachableException("package should be set here");
await SendAndDisposeAsync(package);
Interlocked.Decrement(ref _runningMessageHandlers);
}
}
public async Task OnGameTickAsync(PixelGrid pixels, GamePixelGrid gamePixelGrid) public async Task OnGameTickAsync(PixelGrid pixels, GamePixelGrid gamePixelGrid)
{ {
await Task.Yield(); await Task.Yield();
var next = BuildNextPackage(pixels, gamePixelGrid); var next = BuildNextPackage(pixels, gamePixelGrid);
var oldNext = Interlocked.Exchange(ref _next, next); SetNextPackage(next);
_nextPackageEvent.Set();
oldNext?.Dispose();
}
public override ValueTask RemovedAsync()
{
_player?.DecrementConnectionCount();
return ValueTask.CompletedTask;
}
public void Dispose()
{
_nextPackageEvent.Dispose();
Interlocked.Exchange(ref _next, null)?.Dispose();
} }
private Package BuildNextPackage(PixelGrid pixels, GamePixelGrid gamePixelGrid) private Package BuildNextPackage(PixelGrid pixels, GamePixelGrid gamePixelGrid)
@ -80,19 +39,17 @@ internal sealed class ClientScreenServerConnection
var nextPixels = _bufferPool.Rent(pixels.Data.Length); var nextPixels = _bufferPool.Rent(pixels.Data.Length);
pixels.Data.CopyTo(nextPixels.Memory); pixels.Data.CopyTo(nextPixels.Memory);
IMemoryOwner<byte>? nextPlayerData = null; if (_playerDataBuilder == null)
if (_playerDataBuilder != null) return new Package(nextPixels, null);
{
var data = _playerDataBuilder.Build(gamePixelGrid);
nextPlayerData = _bufferPool.Rent(data.Length);
data.CopyTo(nextPlayerData.Memory);
}
var next = new Package(nextPixels, nextPlayerData); var data = _playerDataBuilder.Build(gamePixelGrid);
return next; var nextPlayerData = _bufferPool.Rent(data.Length);
data.CopyTo(nextPlayerData.Memory);
return new Package(nextPixels, nextPlayerData);
} }
private async ValueTask SendAndDisposeAsync(Package package) protected override async ValueTask SendPackageAsync(Package package)
{ {
try try
{ {
@ -104,13 +61,15 @@ internal sealed class ClientScreenServerConnection
{ {
Logger.LogWarning(ex, "send failed"); Logger.LogWarning(ex, "send failed");
} }
finally
{
package.Dispose();
}
} }
private sealed record class Package( public override void Dispose()
{
base.Dispose();
_player?.DecrementConnectionCount();
}
internal sealed record class Package(
IMemoryOwner<byte> Pixels, IMemoryOwner<byte> Pixels,
IMemoryOwner<byte>? PlayerData IMemoryOwner<byte>? PlayerData
) : IDisposable ) : IDisposable

View file

@ -69,9 +69,5 @@ internal sealed class ControlsServerConnection : WebsocketServerConnection
return ValueTask.CompletedTask; return ValueTask.CompletedTask;
} }
public override ValueTask RemovedAsync() public override void Dispose() => _player.DecrementConnectionCount();
{
_player.DecrementConnectionCount();
return ValueTask.CompletedTask;
}
} }

View file

@ -0,0 +1,50 @@
using System.Diagnostics;
using DotNext.Threading;
namespace TanksServer.Interactivity;
internal abstract class DroppablePackageRequestConnection<TPackage>(
ILogger logger,
ByteChannelWebSocket socket
) : WebsocketServerConnection(logger, socket), IDisposable
where TPackage : class, IDisposable
{
private readonly AsyncAutoResetEvent _nextPackageEvent = new(false, 1);
private int _runningMessageHandlers = 0;
private TPackage? _next;
protected override ValueTask HandleMessageAsync(Memory<byte> _)
{
if (Interlocked.Increment(ref _runningMessageHandlers) == 1)
return Core();
// client has requested multiple frames, ignoring duplicate requests
Interlocked.Decrement(ref _runningMessageHandlers);
return ValueTask.CompletedTask;
async ValueTask Core()
{
await _nextPackageEvent.WaitAsync();
var package = Interlocked.Exchange(ref _next, null);
if (package == null)
throw new UnreachableException("package should be set here");
await SendPackageAsync(package);
Interlocked.Decrement(ref _runningMessageHandlers);
}
}
protected void SetNextPackage(TPackage next)
{
var oldNext = Interlocked.Exchange(ref _next, next);
_nextPackageEvent.Set();
oldNext?.Dispose();
}
protected abstract ValueTask SendPackageAsync(TPackage package);
public override void Dispose()
{
_nextPackageEvent.Dispose();
Interlocked.Exchange(ref _next, null)?.Dispose();
}
}

View file

@ -6,18 +6,14 @@ using TanksServer.GameLogic;
namespace TanksServer.Interactivity; namespace TanksServer.Interactivity;
// MemoryStream is IDisposable but does not need to be disposed internal sealed class PlayerInfoConnection
#pragma warning disable CA1001 : DroppablePackageRequestConnection<IMemoryOwner<byte>>
internal sealed class PlayerInfoConnection : WebsocketServerConnection
#pragma warning restore CA1001
{ {
private readonly Player _player; private readonly Player _player;
private readonly MapEntityManager _entityManager; private readonly MapEntityManager _entityManager;
private readonly BufferPool _bufferPool; private readonly BufferPool _bufferPool;
private readonly MemoryStream _tempStream = new(); private readonly MemoryStream _tempStream = new();
private int _wantsInfoOnTick = 1;
private IMemoryOwner<byte>? _lastMessage = null; private IMemoryOwner<byte>? _lastMessage = null;
private IMemoryOwner<byte>? _nextMessage = null;
public PlayerInfoConnection( public PlayerInfoConnection(
Player player, Player player,
@ -33,47 +29,22 @@ internal sealed class PlayerInfoConnection : WebsocketServerConnection
_player.IncrementConnectionCount(); _player.IncrementConnectionCount();
} }
protected override ValueTask HandleMessageAsync(Memory<byte> buffer)
{
var next = Interlocked.Exchange(ref _nextMessage, null);
if (next != null)
return SendAndDisposeAsync(next);
_wantsInfoOnTick = 1;
return ValueTask.CompletedTask;
}
public async Task OnGameTickAsync() public async Task OnGameTickAsync()
{ {
await Task.Yield(); await Task.Yield();
var response = await GenerateMessageAsync(); var response = await GenerateMessageAsync();
if (response != null)
var shouldDropPacket = _lastMessage != null && response.Memory.Span.SequenceEqual(_lastMessage.Memory.Span); SetNextPackage(response);
if (shouldDropPacket)
{
response.Dispose();
return;
}
var wantsNow = Interlocked.Exchange(ref _wantsInfoOnTick, 0) != 0;
if (wantsNow)
{
await SendAndDisposeAsync(response);
return;
}
Interlocked.Exchange(ref _nextMessage, response);
} }
public override ValueTask RemovedAsync() public override void Dispose()
{ {
base.Dispose();
_player.DecrementConnectionCount(); _player.DecrementConnectionCount();
return ValueTask.CompletedTask;
} }
private async ValueTask<IMemoryOwner<byte>> GenerateMessageAsync() private async ValueTask<IMemoryOwner<byte>?> GenerateMessageAsync()
{ {
var tank = _entityManager.GetCurrentTankOfPlayer(_player); var tank = _entityManager.GetCurrentTankOfPlayer(_player);
@ -97,12 +68,18 @@ internal sealed class PlayerInfoConnection : WebsocketServerConnection
var messageLength = (int)_tempStream.Position; var messageLength = (int)_tempStream.Position;
var owner = _bufferPool.Rent(messageLength); var owner = _bufferPool.Rent(messageLength);
_tempStream.Position = 0; _tempStream.Position = 0;
await _tempStream.ReadExactlyAsync(owner.Memory); await _tempStream.ReadExactlyAsync(owner.Memory);
return owner;
if (_lastMessage == null || !owner.Memory.Span.SequenceEqual(_lastMessage.Memory.Span))
return owner;
owner.Dispose();
return null;
} }
private async ValueTask SendAndDisposeAsync(IMemoryOwner<byte> data) protected override async ValueTask SendPackageAsync(IMemoryOwner<byte> data)
{ {
await Socket.SendTextAsync(data.Memory); await Socket.SendTextAsync(data.Memory);
Interlocked.Exchange(ref _lastMessage, data)?.Dispose(); Interlocked.Exchange(ref _lastMessage, data)?.Dispose();

View file

@ -37,7 +37,7 @@ internal abstract class WebsocketServer<T>(
await connection.ReceiveAsync(); await connection.ReceiveAsync();
_ = _connections.TryRemove(connection, out _); _ = _connections.TryRemove(connection, out _);
await connection.RemovedAsync(); connection.Dispose();
} }
public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask;

View file

@ -3,7 +3,7 @@ namespace TanksServer.Interactivity;
internal abstract class WebsocketServerConnection( internal abstract class WebsocketServerConnection(
ILogger logger, ILogger logger,
ByteChannelWebSocket socket ByteChannelWebSocket socket
) ): IDisposable
{ {
protected readonly ByteChannelWebSocket Socket = socket; protected readonly ByteChannelWebSocket Socket = socket;
protected readonly ILogger Logger = logger; protected readonly ILogger Logger = logger;
@ -21,7 +21,7 @@ internal abstract class WebsocketServerConnection(
Logger.LogTrace("done receiving"); Logger.LogTrace("done receiving");
} }
public abstract ValueTask RemovedAsync();
protected abstract ValueTask HandleMessageAsync(Memory<byte> buffer); protected abstract ValueTask HandleMessageAsync(Memory<byte> buffer);
public abstract void Dispose();
} }