Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC: attach acks to pad messages (halve number of messages) #286

Open
wants to merge 1 commit into
base: slippi
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Source/Core/Core/NetPlayProto.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ enum
NP_MSG_SLIPPI_MATCH_SELECTIONS = 0x82,
NP_MSG_SLIPPI_CONN_SELECTED = 0x83,
NP_MSG_SLIPPI_CHAT_MESSAGE = 0x84,
NP_MSG_SLIPPI_COMPOSITE = 0x85,

NP_MSG_START_GAME = 0xA0,
NP_MSG_CHANGE_GAME = 0xA1,
Expand Down
129 changes: 116 additions & 13 deletions Source/Core/Core/Slippi/SlippiNetplay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

static std::mutex pad_mutex;
static std::mutex ack_mutex;
static std::mutex ackQueue_mutex; //TODO Check usefulness of ack_mutex

SlippiNetplayClient *SLIPPI_NETPLAY = nullptr;

Expand Down Expand Up @@ -176,6 +177,26 @@ u8 SlippiNetplayClient::LocalPlayerPort()
return this->playerIdx;
}

void appendToPacket(sf::Packet &packet,
std::pair<std::chrono::high_resolution_clock::time_point, sf::Packet> outgoingAck)
{
auto tp = outgoingAck.first;
auto aspac = outgoingAck.second;

// Add the time difference in microseconds between when the ack would've normally departed
// and now i.e this message's departure time to the packet
// The receiver will substract it from the computed ping

uint32_t departureDelayUs = std::lround((std::chrono::high_resolution_clock::now() - tp).count()/1000.);
aspac << departureDelayUs;

size_t aspacDataSize = aspac.getDataSize();

packet << (u8)((aspacDataSize & 0xFF00) >> 8);
packet << (u8)(aspacDataSize & 0x00FF);
packet.append(aspac.getData(), aspacDataSize);
}

// called from ---NETPLAY--- thread
unsigned int SlippiNetplayClient::OnData(sf::Packet &packet, ENetPeer *peer)
{
Expand All @@ -190,6 +211,7 @@ unsigned int SlippiNetplayClient::OnData(sf::Packet &packet, ENetPeer *peer)
{
case NP_MSG_SLIPPI_PAD:
{
// INFO_LOG(SLIPPI_ONLINE, "Received pad packet");
// Fetch current time immediately for the most accurate timing calculations
u64 curTime = Common::Timer::GetTimeUs();

Expand Down Expand Up @@ -310,17 +332,38 @@ unsigned int SlippiNetplayClient::OnData(sf::Packet &packet, ENetPeer *peer)
}
}

// Send Ack
sf::Packet spac;
spac << (MessageId)NP_MSG_SLIPPI_PAD_ACK;
spac << frame;
spac << playerIdx;
// INFO_LOG(SLIPPI_ONLINE, "Sending ack packet for frame %d (player %d) to peer at %d:%d", frame,
// packetPlayerPort,
// peer->address.host, peer->address.port);

ENetPacket *epac = enet_packet_create(spac.getData(), spac.getDataSize(), ENET_PACKET_FLAG_UNSEQUENCED);
int sendResult = enet_peer_send(peer, 2, epac);
// Construct Ack
if (hasGameStarted)
{
sf::Packet spac;
spac << (MessageId)NP_MSG_SLIPPI_PAD_ACK;
spac << frame;
spac << playerIdx;
// INFO_LOG(SLIPPI_ONLINE, "Building ack packet for frame %d (player %d) to peer at %d:%d", frame,
// packetPlayerPort, peer->address.host, peer->address.port);

{
std::lock_guard<std::mutex> lk(ackQueue_mutex);
outgoingAcksQueue.push_back({std::chrono::high_resolution_clock::now(), spac});
if (outgoingAcksQueue.size() >= 5) // Flush the ack queue if size >= 5
{
ERROR_LOG(SLIPPI_ONLINE, "Ack queue flushed");

sf::Packet cpac;
cpac << (MessageId)NP_MSG_SLIPPI_COMPOSITE;
for (auto outgoingAck : outgoingAcksQueue)
{
appendToPacket(cpac, outgoingAcksQueue.front());
}
outgoingAcksQueue.clear();

ENetPacket *epac =
enet_packet_create(cpac.getData(), cpac.getDataSize(), ENET_PACKET_FLAG_UNSEQUENCED);
int sendResult = enet_peer_send(peer, 2, epac);
}
}

}
}
break;

Expand All @@ -335,6 +378,8 @@ unsigned int SlippiNetplayClient::OnData(sf::Packet &packet, ENetPeer *peer)
ERROR_LOG(SLIPPI_ONLINE, "Ack packet too small to read frame");
break;
}
// INFO_LOG(SLIPPI_ONLINE, "Received ack packet for frame %d", frame);

u8 packetPlayerPort;
if (!(packet >> packetPlayerPort))
{
Expand All @@ -348,6 +393,13 @@ unsigned int SlippiNetplayClient::OnData(sf::Packet &packet, ENetPeer *peer)
break;
}

u32 departureDelayUs;
if (!(packet >> departureDelayUs))
{
ERROR_LOG(SLIPPI_ONLINE, "Got ack packet with missing departure delay for frame %d", frame);
break;
}

// INFO_LOG(SLIPPI_ONLINE, "Received ack packet from player %d(%d) [%d]...", packetPlayerPort, pIdx, frame);

lastFrameAcked[pIdx] = frame > lastFrameAcked[pIdx] ? frame : lastFrameAcked[pIdx];
Expand All @@ -367,7 +419,7 @@ unsigned int SlippiNetplayClient::OnData(sf::Packet &packet, ENetPeer *peer)
auto sendTime = ackTimers[pIdx].Front().timeUs;
ackTimers[pIdx].Pop();

pingUs[pIdx] = Common::Timer::GetTimeUs() - sendTime;
pingUs[pIdx] = Common::Timer::GetTimeUs() - sendTime - departureDelayUs;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't, ping is just calculated as (arrival of ack - departure of pad - departure delay communicated) instead of (arrival of ack - departure of pad). It should be transparent to the rest of the world.
The ping information will be slightly older though ( < 1f ) since the required info is sent with the next pad message instead of being sent instantly.

if (g_ActiveConfig.bShowNetPlayPing && frame % SLIPPI_PING_DISPLAY_INTERVAL == 0 && pIdx == 0)
{
std::stringstream pingDisplay;
Expand All @@ -382,6 +434,22 @@ unsigned int SlippiNetplayClient::OnData(sf::Packet &packet, ENetPeer *peer)
}
break;

case NP_MSG_SLIPPI_COMPOSITE:
{
// INFO_LOG(SLIPPI_ONLINE, "Received composite packet");
// [composite message type ID] ([length (2 bytes)] [message type ID] rest of message)*
size_t index = 1;
while (packet.getDataSize() > index)
{
size_t length = ((size_t)(((u8 *)packet.getData())[index])<<8) + (size_t)(((u8 *)packet.getData())[index+1]);
sf::Packet subPacket = sf::Packet();
subPacket.append((char *)packet.getData() + index + 2, length);
OnData(subPacket, peer);
index += length + 2;
}
}
break;

case NP_MSG_SLIPPI_MATCH_SELECTIONS:
{
auto s = readSelectionsFromPacket(packet);
Expand Down Expand Up @@ -898,6 +966,10 @@ void SlippiNetplayClient::StartSlippiGame()
hasGameStarted = false;

localPadQueue.clear();
{
std::lock_guard<std::mutex> lk(ackQueue_mutex);
outgoingAcksQueue.clear();
}

for (int i = 0; i < m_remotePlayerCount; i++)
{
Expand Down Expand Up @@ -955,6 +1027,7 @@ void SlippiNetplayClient::SendSlippiPad(std::unique_ptr<SlippiPad> pad)
}
// INFO_LOG(SLIPPI_ONLINE, "Checking to drop local inputs, oldest frame: %d | minAckFrame: %d | %d, %d, %d",
// localPadQueue.back()->frame, minAckFrame, lastFrameAcked[0], lastFrameAcked[1], lastFrameAcked[2]);

while (!localPadQueue.empty() && localPadQueue.back()->frame < minAckFrame)
{
// INFO_LOG(SLIPPI_ONLINE, "Dropping local input for frame %d from queue", localPadQueue.back()->frame);
Expand Down Expand Up @@ -984,8 +1057,38 @@ void SlippiNetplayClient::SendSlippiPad(std::unique_ptr<SlippiPad> pad)
spac->append((*it)->padBuf, SLIPPI_PAD_DATA_SIZE); // only transfer 8 bytes per pad
}

SendAsync(std::move(spac));
// If one more outgoing acks are waiting, make a composite packet
// Allow up to 2 ack packets in one departure so we can't permanently get in a situation where acks are lagging behind

{
std::lock_guard<std::mutex> lk(ackQueue_mutex);
if (outgoingAcksQueue.size() > 0)
{
// INFO_LOG(SLIPPI_ONLINE, "Appending an ack to the pad message for frame %d", frame);

auto cspac = std::make_unique<sf::Packet>();

*cspac << static_cast<MessageId>(NP_MSG_SLIPPI_COMPOSITE);

size_t spacDataSize = spac->getDataSize();
*cspac << (u8)((spacDataSize & 0xFF00) >> 8);
*cspac << (u8)(spacDataSize & 0x00FF);
cspac->append(spac->getData(), spacDataSize);

for (int i = 0; i < std::min((int)outgoingAcksQueue.size(), 2); i++)
{
appendToPacket(*cspac, outgoingAcksQueue.front());
outgoingAcksQueue.pop_front();
}

SendAsync(std::move(cspac));
}
else
{
SendAsync(std::move(spac));
}
}

u64 time = Common::Timer::GetTimeUs();

hasGameStarted = true;
Expand Down
2 changes: 2 additions & 0 deletions Source/Core/Core/Slippi/SlippiNetplay.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ class SlippiNetplayClient
std::deque<std::unique_ptr<SlippiPad>>
remotePadQueue[SLIPPI_REMOTE_PLAYER_MAX]; // most recent inputs at start of deque

std::deque<std::pair<std::chrono::high_resolution_clock::time_point, sf::Packet>> outgoingAcksQueue;

u64 pingUs[SLIPPI_REMOTE_PLAYER_MAX];
int32_t lastFrameAcked[SLIPPI_REMOTE_PLAYER_MAX];
FrameOffsetData frameOffsetData[SLIPPI_REMOTE_PLAYER_MAX];
Expand Down