28 #ifdef USE_THREADED_SEND
35 using namespace SLNet;
38 double Ceil(
double d) {
if (((
double)((
int)d))==d)
return d;
return (
int) (d+1.0);}
49 #if CC_TIME_TYPE_BYTES==4
51 static const CCTimeType HISTOGRAM_RESTART_CYCLE=10000;
64 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
65 static unsigned int packetNumber=0;
101 #if CC_TIME_TYPE_BYTES==8
113 struct DatagramHeaderFormat
115 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
127 bool isContinuousSend;
131 static BitSize_t GetDataHeaderBitLength()
136 static unsigned int GetDataHeaderByteLength()
140 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
141 sizeof(RakNetTimeMS) +
159 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
177 b->
Write(isPacketPair);
178 b->
Write(isContinuousSend);
179 b->
Write(needsBAndAs);
181 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
184 b->
Write(datagramNumber);
201 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
219 b->
Read(isPacketPair);
220 b->
Read(isContinuousSend);
221 b->
Read(needsBAndAs);
223 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
226 b->
Read(datagramNumber);
240 #if defined (DEBUG_SPLIT_PACKET_PROBLEMS)
241 static int waitFlag=-1;
244 using namespace SLNet;
248 #if PREALLOCATE_LARGE_MESSAGES==1
249 if (key < data->returnedPacket->splitPacketId)
251 if (key == data->returnedPacket->splitPacketId)
254 if (key < data->splitPacketList[0]->splitPacketId)
293 if (key < data->splitPacketIndex)
315 minExtraPing=extraPingVariance=0;
316 packetloss=(double) minExtraPing;
320 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
323 fopen_s(&fp,
"reliableorderedoutput.txt",
"wt");
327 InitializeVariables();
329 datagramHistoryMessagePool.SetPageSize(
sizeof(MessageNumberNode)*128);
350 InitializeVariables();
352 #if LIBCAT_SECURITY==1
353 useSecurity = _useSecurity;
356 MTUSize -= cat::AuthenticatedEncryption::OVERHEAD_BYTES;
359 #endif // LIBCAT_SECURITY
383 void ReliabilityLayer::InitializeVariables(
void )
389 memset( &statistics, 0,
sizeof( statistics ) );
390 memset( &heapIndexOffsets, 0,
sizeof( heapIndexOffsets ) );
394 elapsedTimeSinceLastUpdate=0;
395 throughputCapCountdown=0;
396 sendReliableMessageNumberIndex = 0;
397 internalOrderIndex=0;
398 timeToNextUnreliableCull=0;
399 unreliableLinkedListHead=0;
401 bandwidthExceededStatistic=
false;
407 countdownToNextPacketPair=15;
409 nextAllowedThroughputSample=0;
410 deadConnection = cheater =
false;
411 timeOfLastContinualSend=0;
419 statistics.messagesInResendBuffer=0;
420 statistics.bytesInResendBuffer=0;
422 receivedPacketsBaseIndex=0;
423 resetReceivedPackets=
true;
424 receivePacketCount=0;
433 nextSendTime=lastUpdateTime;
439 unacknowledgedBytes=0;
440 resendLinkedListHead=0;
441 totalUserDataBytesAcked=0;
443 datagramHistoryPopCount=0;
448 statistics.messageInSendBuffer[i]=0;
449 statistics.bytesInSendBuffer[i]=0.0;
461 void ReliabilityLayer::FreeMemory(
bool freeAllImmediately )
463 (void) freeAllImmediately;
464 FreeThreadSafeMemory();
467 void ReliabilityLayer::FreeThreadSafeMemory(
void )
472 ClearPacketsAndDatagrams();
474 for (i=0; i < splitPacketChannelList.Size(); i++)
476 for (j=0; j < splitPacketChannelList[i]->splitPacketList.Size(); j++)
478 FreeInternalPacketData(splitPacketChannelList[i]->splitPacketList[j],
_FILE_AND_LINE_ );
479 ReleaseToInternalPacketPool( splitPacketChannelList[i]->splitPacketList[j] );
481 #if PREALLOCATE_LARGE_MESSAGES==1
482 if (splitPacketChannelList[i]->returnedPacket)
484 FreeInternalPacketData(splitPacketChannelList[i]->returnedPacket, __FILE__, __LINE__ );
485 ReleaseToInternalPacketPool( splitPacketChannelList[i]->returnedPacket );
492 while ( outputQueue.Size() > 0 )
494 internalPacket = outputQueue.Pop();
496 ReleaseToInternalPacketPool( internalPacket );
527 for (j=0; j < orderingHeaps[i].Size(); j++)
530 ReleaseToInternalPacketPool( orderingHeaps[i][j] );
537 memset(resendBuffer, 0,
sizeof(resendBuffer));
538 statistics.messagesInResendBuffer=0;
539 statistics.bytesInResendBuffer=0;
541 if (resendLinkedListHead)
552 if (iter==resendLinkedListHead)
554 ReleaseToInternalPacketPool(prev);
557 ReleaseToInternalPacketPool(prev);
559 resendLinkedListHead=0;
561 unacknowledgedBytes=0;
565 for ( j=0 ; j < outgoingPacketBuffer.Size(); j++ )
567 if ( outgoingPacketBuffer[ j ]->data)
569 ReleaseToInternalPacketPool( outgoingPacketBuffer[ j ] );
575 for (i = 0; i < delayList.Size(); i++ )
577 delayList.Clear(__FILE__, __LINE__);
586 packetsToSendThisUpdateDatagramBoundaries.Clear(
false,
_FILE_AND_LINE_);
587 packetsToSendThisUpdateDatagramBoundaries.Preallocate(128,
_FILE_AND_LINE_);
608 while (datagramHistory.Size())
610 RemoveFromDatagramHistory(datagramHistoryPopCount);
611 datagramHistory.Pop();
612 datagramHistoryPopCount++;
615 datagramHistoryPopCount=0;
617 acknowlegements.Clear();
620 unreliableLinkedListHead=0;
640 #if CC_TIME_TYPE_BYTES==4
649 if ( length <= 2 || buffer == 0 )
651 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
652 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"length <= 2 || buffer == 0",
BYTES_TO_BITS(length), systemAddress,
true);
664 #if LIBCAT_SECURITY==1
667 unsigned int received = length;
669 if (!auth_enc.Decrypt((cat::u8*)buffer, received))
683 DatagramHeaderFormat dhf;
684 dhf.Deserialize(&socketData);
685 if (dhf.isValid==
false)
687 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
688 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"dhf.isValid==false",
BYTES_TO_BITS(length), systemAddress,
true);
697 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
699 CCTimeType rtt = timeMSLow-dhf.sourceSystemTime;
700 #if CC_TIME_TYPE_BYTES==4
715 if (dhf.hasBAndAS==
false)
724 incomingAcks.Clear();
725 if (incomingAcks.Deserialize(&socketData)==
false)
727 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
728 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"incomingAcks.Deserialize failed",
BYTES_TO_BITS(length), systemAddress,
true);
734 while (k < unreliableWithAckReceiptHistory.Size()) {
735 if (incomingAcks.IsWithinRange(unreliableWithAckReceiptHistory[k].datagramNumber)) {
740 memcpy(ackReceipt->
data +
sizeof(
MessageID), &unreliableWithAckReceiptHistory[k].sendReceiptSerial,
sizeof(
uint32_t));
744 unreliableWithAckReceiptHistory.RemoveAtIndex(k);
751 if (datagramHistory.IsEmpty()) {
752 receivePacketCount++;
756 for (i = 0; i < incomingAcks.ranges.Size(); i++) {
758 RakAssert(incomingAcks.ranges[i].minIndex <= incomingAcks.ranges[i].maxIndex);
760 if (incomingAcks.ranges[i].maxIndex == (
uint24_t)(0xFFFFFFFF)) {
761 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
762 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"incomingAcks maxIndex is max value",
BYTES_TO_BITS(length), systemAddress,
true);
766 for (datagramNumber = incomingAcks.ranges[i].minIndex; datagramNumber <= incomingAcks.ranges[i].maxIndex; datagramNumber++) {
768 if (offsetIntoList >= datagramHistory.Size()) {
770 receivePacketCount++;
775 MessageNumberNode *messageNumberNode = GetMessageNumberNodeByDatagramIndex(datagramNumber, &whenSent);
776 if (messageNumberNode)
779 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
780 congestionManager.OnAck(timeRead, rtt, dhf.hasBAndAS, 0, dhf.AS, totalUserDataBytesAcked, bandwidthExceededStatistic, datagramNumber );
783 if (timeRead>whenSent)
784 ping=timeRead-whenSent;
787 congestionManager.OnAck(timeRead, ping, dhf.hasBAndAS, 0, dhf.AS, totalUserDataBytesAcked, bandwidthExceededStatistic, datagramNumber );
789 while (messageNumberNode)
794 RemovePacketFromResendListAndDeleteOlderReliableSequenced( messageNumberNode->messageNumber, timeRead, messageHandlerList, systemAddress );
795 messageNumberNode=messageNumberNode->next;
798 RemoveFromDatagramHistory(datagramNumber);
813 if (datagramHistory.IsEmpty()) {
814 receivePacketCount++;
822 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
823 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"incomingNAKs.Deserialize failed",
BYTES_TO_BITS(length), systemAddress,
true);
833 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
834 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"incomingNAKs maxIndex is max value",
BYTES_TO_BITS(length), systemAddress,
true);
840 for (messageNumber = incomingNAKs.
ranges[i].minIndex; messageNumber <= incomingNAKs.
ranges[i].maxIndex; messageNumber++)
842 congestionManager.OnNAK(timeRead, messageNumber);
849 if (offsetIntoList >= datagramHistory.Size()) {
851 receivePacketCount++;
856 MessageNumberNode *messageNumberNode = GetMessageNumberNodeByDatagramIndex(messageNumber, &timeSent);
857 while (messageNumberNode)
869 messageNumberNode=messageNumberNode->next;
877 if (!congestionManager.OnGotPacket(dhf.datagramNumber, dhf.isContinuousSend, timeRead, length, &skippedMessageCount))
879 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
880 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"congestionManager.OnGotPacket failed",
BYTES_TO_BITS(length), systemAddress,
true);
884 if (dhf.isPacketPair)
885 congestionManager.OnGotPacketPair(dhf.datagramNumber, length, timeRead);
887 DatagramHeaderFormat dhfNAK;
890 for (skippedMessageOffset=skippedMessageCount; skippedMessageOffset > 0; skippedMessageOffset--)
892 NAKs.Insert(dhf.datagramNumber-skippedMessageOffset);
894 remoteSystemNeedsBAndAS=dhf.needsBAndAs;
898 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
899 SendAcknowledgementPacket( dhf.datagramNumber, dhf.sourceSystemTime);
901 SendAcknowledgementPacket( dhf.datagramNumber, 0);
904 InternalPacket* internalPacket = CreateInternalPacketFromBitStream( &socketData, timeRead );
905 if (internalPacket==0)
907 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
908 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"CreateInternalPacketFromBitStream failed",
BYTES_TO_BITS(length), systemAddress,
true);
913 while ( internalPacket )
915 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
917 #if CC_TIME_TYPE_BYTES==4
918 messageHandlerList[messageHandlerIndex]->OnInternalPacket(internalPacket, receivePacketCount, systemAddress, timeRead,
false);
920 messageHandlerList[messageHandlerIndex]->OnInternalPacket(internalPacket, receivePacketCount, systemAddress, (
SLNet::TimeMS)(timeRead/(
CCTimeType)1000),
false);
928 if (resetReceivedPackets)
931 receivedPacketsBaseIndex=0;
932 resetReceivedPackets=
false;
944 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
945 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"internalPacket->orderingChannel >= NUMBER_OF_ORDERED_STREAMS",
BYTES_TO_BITS(length), systemAddress,
true);
950 ReleaseToInternalPacketPool( internalPacket );
951 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
971 if (hasReceivedPacketQueue.Size())
972 hasReceivedPacketQueue.Pop();
973 ++receivedPacketsBaseIndex;
979 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
980 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"holeCount > typeRange/(DatagramSequenceNumberType) 2",
BYTES_TO_BITS(length), systemAddress,
false);
984 ReleaseToInternalPacketPool( internalPacket );
986 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
988 else if ((
unsigned int) holeCount<hasReceivedPacketQueue.Size())
991 if (hasReceivedPacketQueue[holeCount]!=
false)
993 #ifdef LOG_TRIVIAL_NOTIFICATIONS
994 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
995 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"Higher count pushed to hasReceivedPacketQueue",
BYTES_TO_BITS(length), systemAddress,
false);
999 hasReceivedPacketQueue[holeCount]=
false;
1005 #ifdef LOG_TRIVIAL_NOTIFICATIONS
1006 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
1007 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"Duplicate packet ignored",
BYTES_TO_BITS(length), systemAddress,
false);
1012 ReleaseToInternalPacketPool( internalPacket );
1014 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
1021 RakAssert(
"Hole count too high. See ReliabilityLayer.h" && 0);
1023 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
1024 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"holeCount > 1000000",
BYTES_TO_BITS(length), systemAddress,
true);
1030 ReleaseToInternalPacketPool( internalPacket );
1032 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
1035 #ifdef LOG_TRIVIAL_NOTIFICATIONS
1036 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
1037 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"Adding to hasReceivedPacketQueue later ordered message",
BYTES_TO_BITS(length), systemAddress,
false);
1046 while ((
unsigned int)(holeCount) > hasReceivedPacketQueue.Size())
1055 while ( hasReceivedPacketQueue.Size()>0 && hasReceivedPacketQueue.Peek()==false )
1057 hasReceivedPacketQueue.Pop();
1058 ++receivedPacketsBaseIndex;
1243 InsertIntoSplitPacketList( internalPacket, timeRead );
1245 internalPacket = BuildPacketFromSplitPacketList( internalPacket->
splitPacketId, timeRead,
1246 s, systemAddress, rnr, updateBitStream);
1248 if ( internalPacket == 0 )
1250 #ifdef LOG_TRIVIAL_NOTIFICATIONS
1251 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
1252 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"BuildPacketFromSplitPacketList did not return anything.",
BYTES_TO_BITS(length), systemAddress,
false);
1256 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
1260 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
1261 unsigned char packetId;
1262 char *type=
"UNDEFINED";
1268 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
1272 unsigned int receivedPacketNumber;
1274 unsigned char streamNumber;
1280 bitStream.
Read(receivedTime);
1281 bitStream.
Read(packetId);
1282 bitStream.
Read(receivedPacketNumber);
1283 bitStream.
Read(streamNumber);
1284 bitStream.
Read(reliability);
1289 type=
"UNRELIABLE_SEQUENCED";
1291 type=
"RELIABLE_ORDERED";
1293 type=
"RELIABLE_SEQUENCED";
1310 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
1313 fprintf(fp,
"Returning %i, %s by fallthrough. OI=%i. SI=%i.\n", receivedPacketNumber, type, internalPacket->
orderingIndex.
val, internalPacket->
sequencingIndex);
1319 if (receivedPacketNumber<packetNumber)
1323 fprintf(fp,
"Out of order packet from fallthrough! Expecting %i got %i\n", receivedPacketNumber, packetNumber);
1327 packetNumber=receivedPacketNumber+1;
1339 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
1342 fprintf(fp,
"Discarding %i, %s late sequenced. OI=%i. SI=%i.\n", receivedPacketNumber, type, internalPacket->
orderingIndex.
val, internalPacket->
sequencingIndex);
1347 #ifdef LOG_TRIVIAL_NOTIFICATIONS
1348 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
1349 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"Sequenced rejected: lower than highest known value",
BYTES_TO_BITS(length), systemAddress,
false);
1354 ReleaseToInternalPacketPool( internalPacket );
1356 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
1365 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
1368 fprintf(fp,
"outputting immediate %i, %s. OI=%i. SI=%i.", receivedPacketNumber, type, internalPacket->
orderingIndex.
val, internalPacket->
sequencingIndex);
1370 fprintf(fp,
"heap empty\n");
1372 fprintf(fp,
"heap head=%i\n", orderingHeaps[internalPacket->
orderingChannel].Peek()->orderingIndex.val);
1374 if (receivedPacketNumber<packetNumber)
1378 fprintf(fp,
"Out of order packet arrived! Expecting %i got %i\n", receivedPacketNumber, packetNumber);
1382 packetNumber=receivedPacketNumber+1;
1395 internalPacket = orderingHeaps[internalPacket->
orderingChannel].Pop(0);
1397 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
1400 bitStream2.
Read(receivedTime);
1402 bitStream2.
Read(receivedPacketNumber);
1403 bitStream2.
Read(streamNumber);
1404 bitStream2.
Read(reliability);
1405 char *type=
"UNDEFINED";
1407 type=
"UNRELIABLE_SEQUENCED";
1409 type=
"RELIABLE_ORDERED";
1416 if (receivedPacketNumber<packetNumber)
1420 fprintf(fp,
"Out of order packet from heap! Expecting %i got %i\n", receivedPacketNumber, packetNumber);
1424 packetNumber=receivedPacketNumber+1;
1442 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
1461 weight+=(1048576-1);
1464 #ifdef PRINT_TO_FILE_RELIABLE_ORDERED_TEST
1472 #ifdef LOG_TRIVIAL_NOTIFICATIONS
1473 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
1474 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"Larger number ordered packet leaving holes",
BYTES_TO_BITS(length), systemAddress,
false);
1478 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
1484 ReleaseToInternalPacketPool( internalPacket );
1486 #ifdef LOG_TRIVIAL_NOTIFICATIONS
1487 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
1488 messageHandlerList[messageHandlerIndex]->OnReliabilityLayerNotification(
"Rejected older resend",
BYTES_TO_BITS(length), systemAddress,
false);
1492 goto CONTINUE_SOCKET_DATA_PARSE_LOOP;
1507 CONTINUE_SOCKET_DATA_PARSE_LOOP:
1509 internalPacket = CreateInternalPacketFromBitStream( &socketData, timeRead );
1515 receivePacketCount++;
1527 if ( outputQueue.Size() > 0 )
1532 internalPacket = outputQueue.Pop();
1535 *data = internalPacket->
data;
1537 ReleaseToInternalPacketPool( internalPacket );
1559 RakAssert( !( priority > NUMBER_OF_PRIORITIES || priority < 0 ) );
1560 RakAssert( !( orderingChannel >= NUMBER_OF_ORDERED_STREAMS ) );
1564 #if CC_TIME_TYPE_BYTES==4
1576 if ( priority > NUMBER_OF_PRIORITIES || priority < 0 )
1579 if ( orderingChannel >= NUMBER_OF_ORDERED_STREAMS )
1580 orderingChannel = 0;
1582 unsigned int numberOfBytesToSend=(
unsigned int)
BITS_TO_BYTES(numberOfBitsToSend);
1583 if ( numberOfBitsToSend == 0 )
1587 InternalPacket * internalPacket = AllocateFromInternalPacketPool();
1588 if (internalPacket==0)
1600 AllocInternalPacketData(internalPacket, numberOfBytesToSend,
true,
_FILE_AND_LINE_ );
1602 memcpy( internalPacket->
data, data, numberOfBytesToSend );
1608 AllocInternalPacketData(internalPacket, (
unsigned char*) data );
1613 internalPacket->
priority = priority;
1620 unsigned int maxDataSizeBytes = GetMaxDatagramSizeExcludingMessageHeaderBytes() -
BITS_TO_BYTES(GetMaxMessageHeaderLengthBits());
1622 bool splitPacket = numberOfBytesToSend > maxDataSizeBytes;
1650 internalPacket->
orderingIndex = orderedWriteIndex[ orderingChannel ];
1651 internalPacket->
sequencingIndex = sequencedWriteIndex[ orderingChannel ]++;
1671 internalPacket->
orderingIndex = orderedWriteIndex[ orderingChannel ] ++;
1672 sequencedWriteIndex[ orderingChannel ]=0;
1682 SplitPacket( internalPacket );
1688 AddToUnreliableLinkedList(internalPacket);
1694 statistics.messageInSendBuffer[(int)internalPacket->
priority]++;
1704 unsigned bitsPerSecondLimit,
1713 #if CC_TIME_TYPE_BYTES==4
1721 while (delayList.Size())
1723 if (delayList.Peek()->sendTime <= timeMs)
1725 DataAndTime *dat = delayList.Pop();
1729 bsp.
data = (
char*) dat->data;
1730 bsp.
length = dat->length;
1744 if (time <= lastUpdateTime)
1747 lastUpdateTime=time;
1751 CCTimeType timeSinceLastTick = time - lastUpdateTime;
1752 lastUpdateTime=time;
1753 #if CC_TIME_TYPE_BYTES==4
1754 if (timeSinceLastTick>100)
1755 timeSinceLastTick=100;
1757 if (timeSinceLastTick>100000)
1758 timeSinceLastTick=100000;
1761 if (unreliableTimeout>0)
1763 if (timeSinceLastTick>=timeToNextUnreliableCull)
1765 if (unreliableLinkedListHead)
1780 RemoveFromUnreliableLinkedList(cur);
1800 timeToNextUnreliableCull=unreliableTimeout/(
CCTimeType)2;
1804 timeToNextUnreliableCull-=timeSinceLastTick;
1811 #if CC_TIME_TYPE_BYTES==4
1812 if ( statistics.messagesInResendBuffer!=0 && AckTimeout(time) )
1819 deadConnection =
true;
1823 if (congestionManager.ShouldSendACKs(time,timeSinceLastTick))
1825 SendACKs(s, systemAddress, time, rnr, updateBitStream);
1830 updateBitStream.
Reset();
1831 DatagramHeaderFormat dhfNAK;
1834 dhfNAK.isPacketPair=
false;
1835 dhfNAK.Serialize(&updateBitStream);
1836 NAKs.Serialize(&updateBitStream, GetMaxDatagramSizeExcludingMessageHeaderBits(),
true);
1837 SendBitStream( s, systemAddress, &updateBitStream, rnr, time );
1840 DatagramHeaderFormat dhf;
1841 dhf.needsBAndAs=congestionManager.GetIsInSlowStart();
1842 dhf.isContinuousSend=bandwidthExceededStatistic;
1847 bandwidthExceededStatistic=outgoingPacketBuffer.Size()>0;
1849 const bool hasDataToSendOrResend = IsResendQueueEmpty()==
false || bandwidthExceededStatistic;
1851 congestionManager.Update(time, hasDataToSendOrResend);
1853 statistics.BPSLimitByOutgoingBandwidthLimit =
BITS_TO_BYTES(bitsPerSecondLimit);
1854 statistics.BPSLimitByCongestionControl = congestionManager.GetBytesPerSecondLimitByCongestionControl();
1857 if (time > lastBpsClear+
1867 bpsMetrics[i].ClearExpired1(time);
1873 if (unreliableWithAckReceiptHistory.Size()>0)
1876 while (i < unreliableWithAckReceiptHistory.Size())
1879 if (time - unreliableWithAckReceiptHistory[i].nextActionTime < (((
CCTimeType)-1)/2) )
1885 memcpy(ackReceipt->
data+
sizeof(
MessageID), &unreliableWithAckReceiptHistory[i].sendReceiptSerial,
sizeof(
uint32_t));
1889 unreliableWithAckReceiptHistory.RemoveAtIndex(i);
1896 if (hasDataToSendOrResend==
true)
1900 bool pushedAnything;
1904 dhf.hasBAndAS=
false;
1905 ResetPacketsAndDatagrams();
1907 int transmissionBandwidth = congestionManager.GetTransmissionBandwidth(time, timeSinceLastTick, unacknowledgedBytes,dhf.isContinuousSend);
1908 int retransmissionBandwidth = congestionManager.GetRetransmissionBandwidth(time, timeSinceLastTick, unacknowledgedBytes,dhf.isContinuousSend);
1909 if (retransmissionBandwidth>0 || transmissionBandwidth>0)
1911 statistics.isLimitedByCongestionControl=
false;
1913 allDatagramSizesSoFar=0;
1916 while ((
int)
BITS_TO_BYTES(allDatagramSizesSoFar)<retransmissionBandwidth)
1918 pushedAnything=
false;
1921 while ( IsResendQueueEmpty()==
false )
1923 internalPacket = resendLinkedListHead;
1930 if ( datagramSizeSoFar + nextPacketBitLength > GetMaxDatagramSizeExcludingMessageHeaderBits() )
1947 PushPacket(time,internalPacket,
true);
1949 congestionManager.OnResend(time, internalPacket->
nextActionTime);
1953 pushedAnything=
true;
1955 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
1957 #if CC_TIME_TYPE_BYTES==4
1958 messageHandlerList[messageHandlerIndex]->OnInternalPacket(internalPacket, packetsToSendThisUpdateDatagramBoundaries.Size()+congestionManager.GetNextDatagramSequenceNumber(), systemAddress, (
SLNet::TimeMS) time,
true);
1960 messageHandlerList[messageHandlerIndex]->OnInternalPacket(internalPacket, packetsToSendThisUpdateDatagramBoundaries.Size()+congestionManager.GetNextDatagramSequenceNumber(), systemAddress, (
SLNet::TimeMS)(time/(
CCTimeType)1000),
true);
1966 InsertPacketIntoResendList( internalPacket, time,
false,
false );
1980 if (pushedAnything==
false)
1986 statistics.isLimitedByCongestionControl=
true;
1989 if ((
int)
BITS_TO_BYTES(allDatagramSizesSoFar)<transmissionBandwidth)
1992 allDatagramSizesSoFar=0;
1996 ResendBufferOverflow()==
false &&
1997 ((
int)
BITS_TO_BYTES(allDatagramSizesSoFar)<transmissionBandwidth ||
1999 (countdownToNextPacketPair==0 &&
2000 datagramsToSendThisUpdateIsPair.Size()==1))
2006 pushedAnything=
false;
2011 while (outgoingPacketBuffer.Size() &&
2012 statistics.isLimitedByOutgoingBandwidthLimit==
false)
2015 internalPacket=outgoingPacketBuffer.Peek();
2020 if (internalPacket->
data==0)
2023 outgoingPacketBuffer.Pop(0);
2025 statistics.messageInSendBuffer[(int)internalPacket->
priority]--;
2027 ReleaseToInternalPacketPool( internalPacket );
2031 internalPacket->
headerLength=GetMessageHeaderLengthBits(internalPacket);
2033 if ( datagramSizeSoFar + nextPacketBitLength > GetMaxDatagramSizeExcludingMessageHeaderBits() )
2054 outgoingPacketBuffer.Pop(0);
2057 statistics.messageInSendBuffer[(int)internalPacket->
priority]--;
2077 #if CC_TIME_TYPE_BYTES==4
2094 statistics.messagesInResendBuffer++;
2099 InsertPacketIntoResendList( internalPacket, time,
true, isReliable);
2103 sendReliableMessageNumberIndex++;
2107 unreliableWithAckReceiptHistory.Push(UnreliableWithAckReceiptNode(
2108 congestionManager.GetNextDatagramSequenceNumber() + packetsToSendThisUpdateDatagramBoundaries.Size(),
2110 congestionManager.GetRTOForRetransmission(internalPacket->
timesSent+1)+time
2122 PushPacket(time,internalPacket, isReliable);
2125 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
2127 #if CC_TIME_TYPE_BYTES==4
2128 messageHandlerList[messageHandlerIndex]->OnInternalPacket(internalPacket, packetsToSendThisUpdateDatagramBoundaries.Size()+congestionManager.GetNextDatagramSequenceNumber(), systemAddress, (
SLNet::TimeMS)time,
true);
2130 messageHandlerList[messageHandlerIndex]->OnInternalPacket(internalPacket, packetsToSendThisUpdateDatagramBoundaries.Size()+congestionManager.GetNextDatagramSequenceNumber(), systemAddress, (
SLNet::TimeMS)(time/(
CCTimeType)1000),
true);
2133 pushedAnything=
true;
2135 if (ResendBufferOverflow())
2143 if (datagramSizeSoFar==0)
2153 for (
unsigned int datagramIndex=0; datagramIndex < packetsToSendThisUpdateDatagramBoundaries.Size(); datagramIndex++)
2155 if (datagramIndex>0)
2156 dhf.isContinuousSend=
true;
2157 MessageNumberNode* messageNumberNode = 0;
2158 dhf.datagramNumber=congestionManager.GetAndIncrementNextDatagramSequenceNumber();
2159 dhf.isPacketPair=datagramsToSendThisUpdateIsPair[datagramIndex];
2163 bool isSecondOfPacketPair=dhf.isPacketPair && datagramIndex>0 && datagramsToSendThisUpdateIsPair[datagramIndex-1];
2164 unsigned int msgIndex, msgTerm;
2165 if (datagramIndex==0)
2168 msgTerm=packetsToSendThisUpdateDatagramBoundaries[0];
2172 msgIndex=packetsToSendThisUpdateDatagramBoundaries[datagramIndex-1];
2173 msgTerm=packetsToSendThisUpdateDatagramBoundaries[datagramIndex];
2177 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
2180 updateBitStream.
Reset();
2181 dhf.Serialize(&updateBitStream);
2184 while (msgIndex < msgTerm)
2187 if ( packetsToSendThisUpdate[msgIndex]->reliability !=
UNRELIABLE &&
2191 if (messageNumberNode==0)
2193 messageNumberNode = AddFirstToDatagramHistory(dhf.datagramNumber, packetsToSendThisUpdate[msgIndex]->reliableMessageNumber, time);
2197 messageNumberNode = AddSubsequentToDatagramHistory(messageNumberNode, packetsToSendThisUpdate[msgIndex]->reliableMessageNumber);
2202 WriteToBitStreamFromInternalPacket( &updateBitStream, packetsToSendThisUpdate[msgIndex], time );
2207 if (isSecondOfPacketPair)
2215 if (messageNumberNode==0)
2218 AddFirstToDatagramHistory(dhf.datagramNumber, time);
2224 congestionManager.OnSendBytes(time,
UDP_HEADER_SIZE+DatagramHeaderFormat::GetDataHeaderByteLength());
2226 SendBitStream( s, systemAddress, &updateBitStream, rnr, time );
2228 bandwidthExceededStatistic=outgoingPacketBuffer.Size()>0;
2236 if (bandwidthExceededStatistic==
true)
2237 timeOfLastContinualSend=time;
2239 timeOfLastContinualSend=0;
2242 ClearPacketsAndDatagrams();
2245 bandwidthExceededStatistic=outgoingPacketBuffer.Size()>0;
2262 (void) systemAddress;
2265 unsigned int length;
2271 if (packetloss > 0.0)
2277 if (minExtraPing > 0 || extraPingVariance > 0)
2279 #ifdef FLIP_SEND_ORDER_TEST
2281 DataAndTime *dat = SLNet::OP_NEW<DataAndTime>(__FILE__,__LINE__);
2282 memcpy(dat->data, (
char* ) bitStream->
GetData(), length );
2286 dat->remotePortRakNetWasStartedOn_PS3=remotePortRakNetWasStartedOn_PS3;
2287 dat->extraSocketOptions=extraSocketOptions;
2291 if (extraPingVariance>0)
2292 delay += (
randomMT() % extraPingVariance);
2295 DataAndTime *dat = SLNet::OP_NEW<DataAndTime>(__FILE__,__LINE__);
2296 memcpy(dat->data, (
char* ) bitStream->
GetData(), length );
2300 for (
unsigned int i=0; i < delayList.Size(); i++)
2302 if (dat->sendTime < delayList[i]->sendTime)
2304 delayList.PushAtHead(dat, i, __FILE__, __LINE__);
2310 delayList.Push(dat,__FILE__,__LINE__);
2317 #if LIBCAT_SECURITY==1
2320 unsigned char *buffer =
reinterpret_cast<unsigned char*
>( bitStream->
GetData() );
2326 bool success = auth_enc.Encrypt(buffer, buffer_size, length);
2333 RakAssert(length <= congestionManager.GetMTU());
2335 #ifdef USE_THREADED_SEND
2336 SendToThread::SendToThreadBlock *block = SendToThread::AllocateBlock();
2337 memcpy(block->data, bitStream->
GetData(), length);
2338 block->dataWriteOffset=length;
2339 block->extraSocketOptions=extraSocketOptions;
2340 block->remotePortRakNetWasStartedOn_PS3=remotePortRakNetWasStartedOn_PS3;
2342 block->systemAddress=systemAddress;
2343 SendToThread::ProcessBlock(block);
2360 if (outgoingPacketBuffer.Size()>0)
2373 statistics.messagesInResendBuffer!=0;
2377 return acknowlegements.Size() > 0;
2383 packetloss=_packetloss;
2384 minExtraPing=_minExtraPing;
2385 extraPingVariance=_extraPingVariance;
2393 splitMessageProgressInterval=interval;
2398 #if CC_TIME_TYPE_BYTES==4
2399 unreliableTimeout=timeoutMS;
2408 bool ReliabilityLayer::IsSendThrottled(
int MTUSize )
2434 void ReliabilityLayer::UpdateWindowFromPacketloss(
CCTimeType time )
2442 void ReliabilityLayer::UpdateWindowFromAck(
CCTimeType time )
2453 (void) messageNumber;
2461 for (
unsigned int messageHandlerIndex=0; messageHandlerIndex < messageHandlerList.
Size(); messageHandlerIndex++)
2463 #if CC_TIME_TYPE_BYTES==4
2464 messageHandlerList[messageHandlerIndex]->OnAck(messageNumber, systemAddress, time);
2466 messageHandlerList[messageHandlerIndex]->OnAck(messageNumber, systemAddress, (
SLNet::TimeMS)(time/(
CCTimeType)1000));
2492 statistics.messagesInResendBuffer--;
2523 RemoveFromList(internalPacket, isReliable);
2525 ReleaseToInternalPacketPool( internalPacket );
2535 return (
unsigned)-1;
2547 nextAckTimeToSend=time;
2548 acknowlegements.Insert(messageNumber);
2560 BitSize_t ReliabilityLayer::GetMaxMessageHeaderLengthBits(
void )
2565 return GetMessageHeaderLengthBits(&ip);
2628 unsigned char tempChar;
2639 tempChar=(
unsigned char)internalPacket->
reliability;
2641 bitStream->
WriteBits( (
const unsigned char *)&tempChar, 3,
true );
2697 bool bitStreamSucceeded;
2699 unsigned char tempChar;
2700 bool hasSplitPacket=
false;
2706 internalPacket = AllocateFromInternalPacketPool();
2707 if (internalPacket==0)
2717 bitStream->
ReadBits( (
unsigned char* ) ( &( tempChar ) ), 3 );
2719 readSuccess=bitStream->
Read(hasSplitPacket);
2773 if (readSuccess==
false ||
2781 ReleaseToInternalPacketPool( internalPacket );
2789 if (internalPacket->
data == 0)
2791 RakAssert(
"Out of memory in ReliabilityLayer::CreateInternalPacketFromBitStream" && 0);
2793 ReleaseToInternalPacketPool( internalPacket );
2803 if ( bitStreamSucceeded ==
false )
2806 RakAssert(
"Couldn't read all the data" && 0);
2809 ReleaseToInternalPacketPool( internalPacket );
2813 return internalPacket;
2820 void ReliabilityLayer::GetSHA1(
unsigned char *
const buffer,
unsigned int
2826 sha1.
Update( (
unsigned char* ) buffer, nbytes );
2834 bool ReliabilityLayer::CheckSHA1(
char code[ SHA1_LENGTH ],
unsigned char *
2835 const buffer,
unsigned int nbytes )
2838 GetSHA1( buffer, nbytes, code2 );
2841 if ( code[ i ] != code2[ i ] )
2920 if ( newPacketOrderingIndex >= waitingForPacketOrderingIndex - maxRange/(
OrderingIndexType)2+(
OrderingIndexType)1 && newPacketOrderingIndex < waitingForPacketOrderingIndex )
2928 newPacketOrderingIndex < waitingForPacketOrderingIndex )
2941 void ReliabilityLayer::SplitPacket(
InternalPacket *internalPacket )
2945 unsigned int headerLength = (
unsigned int)
BITS_TO_BYTES( GetMessageHeaderLengthBits( internalPacket ) );
2947 int maximumSendBlockBytes, byteOffset, bytesToSend;
2952 maximumSendBlockBytes = GetMaxDatagramSizeExcludingMessageHeaderBytes() -
BITS_TO_BYTES(GetMaxMessageHeaderLengthBits());
2955 internalPacket->
splitPacketCount = ( ( dataByteLength - 1 ) / ( maximumSendBlockBytes ) + 1 );
2959 bool usedAlloca=
false;
2972 internalPacketArray[ i ] = AllocateFromInternalPacketPool();
2976 *internalPacketArray[ i ]=*internalPacket;
2984 splitPacketIndex = 0;
2991 byteOffset = splitPacketIndex * maximumSendBlockBytes;
2992 bytesToSend = dataByteLength - byteOffset;
2994 if ( bytesToSend > maximumSendBlockBytes )
2995 bytesToSend = maximumSendBlockBytes;
2999 AllocInternalPacketData(internalPacketArray[ splitPacketIndex ], &refCounter, internalPacket->
data, internalPacket->
data + byteOffset);
3003 if ( bytesToSend != maximumSendBlockBytes )
3004 internalPacketArray[ splitPacketIndex ]->
dataBitLength = internalPacket->
dataBitLength - splitPacketIndex * ( maximumSendBlockBytes << 3 );
3006 internalPacketArray[ splitPacketIndex ]->
dataBitLength = bytesToSend << 3;
3008 internalPacketArray[ splitPacketIndex ]->
splitPacketIndex = splitPacketIndex;
3009 internalPacketArray[ splitPacketIndex ]->
splitPacketId = splitPacketId;
3012 }
while ( ++splitPacketIndex < internalPacket->splitPacketCount );
3020 outgoingPacketBuffer.StartSeries();
3027 AddToUnreliableLinkedList(internalPacketArray[ i ]);
3030 RakAssert(internalPacketArray[ i ]->messageNumberAssigned==
false);
3031 outgoingPacketBuffer.PushSeries(GetNextWeight(internalPacketArray[ i ]->priority), internalPacketArray[ i ],
_FILE_AND_LINE_);
3033 statistics.messageInSendBuffer[(int)internalPacketArray[ i ]->priority]++;
3034 statistics.bytesInSendBuffer[(int)(
int)internalPacketArray[ i ]->
priority]+=(double)
BITS_TO_BYTES(internalPacketArray[ i ]->dataBitLength);
3042 ReleaseToInternalPacketPool( internalPacket );
3044 if (usedAlloca==
false)
3056 index=splitPacketChannelList.GetIndexFromKey(internalPacket->
splitPacketId, &objectExists);
3057 if (objectExists==
false)
3059 SplitPacketChannel *newChannel = SLNet::OP_NEW<SplitPacketChannel>( __FILE__, __LINE__ );
3060 #if PREALLOCATE_LARGE_MESSAGES==1
3061 index=splitPacketChannelList.Insert(internalPacket->
splitPacketId, newChannel,
true, __FILE__,__LINE__);
3062 newChannel->returnedPacket=CreateInternalPacketCopy( internalPacket, 0, 0, time );
3063 newChannel->gotFirstPacket=
false;
3064 newChannel->splitPacketsArrived=0;
3066 RakAssert(newChannel->returnedPacket->data);
3069 index=splitPacketChannelList.Insert(internalPacket->
splitPacketId, newChannel,
true, __FILE__,__LINE__);
3076 #if PREALLOCATE_LARGE_MESSAGES==1
3077 splitPacketChannelList[index]->lastUpdateTime=time;
3078 splitPacketChannelList[index]->splitPacketsArrived++;
3079 splitPacketChannelList[index]->returnedPacket->dataBitLength+=internalPacket->
dataBitLength;
3084 splitPacketChannelList[index]->gotFirstPacket=
true;
3087 for (
unsigned int j=0; j < splitPacketChannelList[index]->splitPacketList.Size(); j++)
3090 FreeInternalPacketData(splitPacketChannelList[index]->splitPacketList[j], __FILE__, __LINE__ );
3091 ReleaseToInternalPacketPool(splitPacketChannelList[index]->splitPacketList[j]);
3095 splitPacketChannelList[index]->splitPacketList.Clear(
true,__FILE__,__LINE__);
3100 if (splitPacketChannelList[index]->gotFirstPacket==
true)
3107 splitPacketChannelList[index]->splitPacketList.Push(internalPacket,__FILE__,__LINE__);
3112 if (splitPacketChannelList[index]->gotFirstPacket==
true &&
3113 splitMessageProgressInterval &&
3117 splitPacketChannelList[index]->gotFirstPacket &&
3118 splitPacketChannelList[index]->splitPacketsArrived!=splitPacketChannelList[index]->returnedPacket->splitPacketCount &&
3119 (splitPacketChannelList[index]->splitPacketsArrived%splitMessageProgressInterval)==0
3127 InternalPacket *progressIndicator = AllocateFromInternalPacketPool();
3129 unsigned int l = (
unsigned int) splitPacketChannelList[index]->stride;
3130 const unsigned int len =
sizeof(
MessageID) +
sizeof(
unsigned int)*2 +
sizeof(
unsigned int) + l;
3131 AllocInternalPacketData(progressIndicator, len,
false, __FILE__, __LINE__ );
3136 temp=splitPacketChannelList[index]->splitPacketsArrived;
3137 memcpy(progressIndicator->
data+
sizeof(
MessageID), &temp,
sizeof(
unsigned int));
3139 memcpy(progressIndicator->
data+
sizeof(
MessageID)+
sizeof(
unsigned int)*1, &temp,
sizeof(
unsigned int));
3142 memcpy(progressIndicator->
data+
sizeof(
MessageID)+
sizeof(
unsigned int)*2, &temp,
sizeof(
unsigned int));
3144 memcpy(progressIndicator->
data+
sizeof(
MessageID)+
sizeof(
unsigned int)*3, splitPacketChannelList[index]->returnedPacket->data, (
size_t)
BITS_TO_BYTES(l));
3149 FreeInternalPacketData(internalPacket, __FILE__, __LINE__ );
3150 ReleaseToInternalPacketPool(internalPacket);
3154 splitPacketChannelList[index]->splitPacketList.Insert(internalPacket, __FILE__, __LINE__ );
3155 splitPacketChannelList[index]->lastUpdateTime=time;
3159 splitPacketChannelList[index]->firstPacket=internalPacket;
3162 if (splitMessageProgressInterval &&
3163 splitPacketChannelList[index]->firstPacket &&
3164 splitPacketChannelList[index]->splitPacketList.Size()!=splitPacketChannelList[index]->firstPacket->splitPacketCount &&
3165 (splitPacketChannelList[index]->splitPacketList.Size()%splitMessageProgressInterval)==0)
3172 InternalPacket *progressIndicator = AllocateFromInternalPacketPool();
3173 unsigned int length =
sizeof(
MessageID) +
sizeof(
unsigned int)*2 +
sizeof(
unsigned int) + (
unsigned int)
BITS_TO_BYTES(splitPacketChannelList[index]->firstPacket->dataBitLength);
3174 AllocInternalPacketData(progressIndicator, length,
false, __FILE__, __LINE__ );
3178 temp=splitPacketChannelList[index]->splitPacketList.Size();
3179 memcpy(progressIndicator->
data+
sizeof(
MessageID), &temp,
sizeof(
unsigned int));
3181 memcpy(progressIndicator->
data+
sizeof(
MessageID)+
sizeof(
unsigned int)*1, &temp,
sizeof(
unsigned int));
3182 temp=(
unsigned int)
BITS_TO_BYTES(splitPacketChannelList[index]->firstPacket->dataBitLength);
3183 memcpy(progressIndicator->
data+
sizeof(
MessageID)+
sizeof(
unsigned int)*2, &temp,
sizeof(
unsigned int));
3185 memcpy(progressIndicator->
data+
sizeof(
MessageID)+
sizeof(
unsigned int)*3, splitPacketChannelList[index]->firstPacket->data, (
size_t)
BITS_TO_BYTES(splitPacketChannelList[index]->firstPacket->dataBitLength));
3186 outputQueue.Push(progressIndicator, __FILE__, __LINE__ );
3199 #if PREALLOCATE_LARGE_MESSAGES==1
3200 InternalPacket *returnedPacket=splitPacketChannel->returnedPacket;
3203 return returnedPacket;
3210 internalPacket = CreateInternalPacketCopy( splitPacketChannel->
splitPacketList[0], 0, 0, time );
3234 return internalPacket;
3248 i=splitPacketChannelList.GetIndexFromKey(inSplitPacketId, &objectExists);
3249 splitPacketChannel=splitPacketChannelList[i];
3251 #if PREALLOCATE_LARGE_MESSAGES==1
3252 if (splitPacketChannel->splitPacketsArrived==splitPacketChannel->returnedPacket->splitPacketCount)
3258 SendACKs(s, systemAddress, time, rnr, updateBitStream);
3259 internalPacket=BuildPacketFromSplitPacketList(splitPacketChannel,time);
3260 splitPacketChannelList.RemoveAtIndex(i);
3261 return internalPacket;
3311 if ( dataByteLength > 0 )
3314 memcpy( copy->
data, original->
data + dataByteOffset, dataByteLength );
3328 #if PREALLOCATE_LARGE_MESSAGES==1
3360 void ReliabilityLayer::InsertPacketIntoResendList(
InternalPacket *internalPacket,
CCTimeType time,
bool firstResend,
bool modifyUnacknowledgedBytes )
3364 (void) internalPacket;
3366 AddToListTail(internalPacket, modifyUnacknowledgedBytes);
3376 return deadConnection;
3384 deadConnection=
true;
3396 double doubleDenominator;
3400 statistics.valueOverLastSecond[i]=bpsMetrics[i].GetBPS1Threadsafe(time);
3401 statistics.runningTotal[i]=bpsMetrics[i].GetTotal1();
3404 memcpy(rns, &statistics,
sizeof(statistics));
3416 if(doubleDenominator!=0)
3433 unsigned int ReliabilityLayer::GetResendListDataSize(
void)
const
3445 return (timeLastDatagramArrived-curTime)>10000 && curTime-timeLastDatagramArrived>timeoutTime;
3450 return nextSendTime;
3455 return timeBetweenPackets;
3458 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
3459 CCTimeType ReliabilityLayer::GetAckPing(
void)
const
3465 void ReliabilityLayer::ResetPacketsAndDatagrams(
void)
3469 packetsToSendThisUpdateDatagramBoundaries.Clear(
true,
_FILE_AND_LINE_);
3472 datagramSizeSoFar=0;
3478 datagramSizeSoFar+=bitsForThisPacket;
3480 allDatagramSizesSoFar+=bitsForThisPacket;
3498 void ReliabilityLayer::PushDatagram(
void)
3500 if (datagramSizeSoFar>0)
3502 packetsToSendThisUpdateDatagramBoundaries.Push(packetsToSendThisUpdate.Size(),
_FILE_AND_LINE_ );
3506 datagramSizeSoFar=0;
3521 bool ReliabilityLayer::TagMostRecentPushAsSecondOfPacketPair(
void)
3523 if (datagramsToSendThisUpdateIsPair.Size()>=2)
3525 datagramsToSendThisUpdateIsPair[datagramsToSendThisUpdateIsPair.Size()-2]=
true;
3526 datagramsToSendThisUpdateIsPair[datagramsToSendThisUpdateIsPair.Size()-1]=
true;
3532 void ReliabilityLayer::ClearPacketsAndDatagrams(
void)
3535 for (i=0; i < packetsToDeallocThisUpdate.Size(); i++)
3538 if (packetsToDeallocThisUpdate[i])
3540 RemoveFromUnreliableLinkedList(packetsToSendThisUpdate[i]);
3543 ReleaseToInternalPacketPool( packetsToSendThisUpdate[i] );
3549 void ReliabilityLayer::MoveToListHead(
InternalPacket *internalPacket)
3551 if ( internalPacket == resendLinkedListHead )
3553 if (resendLinkedListHead==0)
3557 resendLinkedListHead=internalPacket;
3562 internalPacket->
resendNext=resendLinkedListHead;
3565 resendLinkedListHead->
resendPrev=internalPacket;
3566 resendLinkedListHead=internalPacket;
3572 void ReliabilityLayer::RemoveFromList(
InternalPacket *internalPacket,
bool modifyUnacknowledgedBytes)
3578 if ( internalPacket == resendLinkedListHead )
3579 resendLinkedListHead = newPosition;
3580 if (resendLinkedListHead==internalPacket)
3581 resendLinkedListHead=0;
3583 if (modifyUnacknowledgedBytes)
3594 void ReliabilityLayer::AddToListTail(
InternalPacket *internalPacket,
bool modifyUnacknowledgedBytes)
3596 if (modifyUnacknowledgedBytes)
3602 if (resendLinkedListHead==0)
3606 resendLinkedListHead=internalPacket;
3609 internalPacket->
resendNext=resendLinkedListHead;
3612 resendLinkedListHead->
resendPrev=internalPacket;
3618 void ReliabilityLayer::PopListHead(
bool modifyUnacknowledgedBytes)
3621 RemoveFromList(resendLinkedListHead, modifyUnacknowledgedBytes);
3624 bool ReliabilityLayer::IsResendQueueEmpty(
void)
const
3626 return resendLinkedListHead==0;
3631 BitSize_t maxDatagramPayload = GetMaxDatagramSizeExcludingMessageHeaderBits();
3633 while (acknowlegements.Size()>0)
3636 updateBitStream.
Reset();
3637 DatagramHeaderFormat dhf;
3640 dhf.isPacketPair=
false;
3641 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
3642 dhf.sourceSystemTime=time;
3647 if (remoteSystemNeedsBAndAS)
3649 congestionManager.OnSendAckGetBAndAS(time, &hasBAndAS,&B,&AS);
3651 dhf.hasBAndAS=hasBAndAS;
3654 dhf.hasBAndAS=
false;
3655 #if INCLUDE_TIMESTAMP_WITH_DATAGRAMS==1
3656 dhf.sourceSystemTime=nextAckTimeToSend;
3659 updateBitStream.
Reset();
3660 dhf.Serialize(&updateBitStream);
3662 acknowlegements.Serialize(&updateBitStream, maxDatagramPayload,
true);
3663 SendBitStream( s, systemAddress, &updateBitStream, rnr, time );
3688 InternalPacket* ReliabilityLayer::AllocateFromInternalPacketPool(
void)
3703 void ReliabilityLayer::ReleaseToInternalPacketPool(
InternalPacket *ip)
3708 void ReliabilityLayer::RemoveFromUnreliableLinkedList(
InternalPacket *internalPacket)
3721 if ( internalPacket == unreliableLinkedListHead )
3722 unreliableLinkedListHead = newPosition;
3723 if (unreliableLinkedListHead==internalPacket)
3724 unreliableLinkedListHead=0;
3728 void ReliabilityLayer::AddToUnreliableLinkedList(
InternalPacket *internalPacket)
3737 if (unreliableLinkedListHead==0)
3741 unreliableLinkedListHead=internalPacket;
3751 void ReliabilityLayer::ValidateResendList(
void)
const
3771 bool ReliabilityLayer::ResendBufferOverflow(
void)
const
3776 return resendBuffer[index1]!=0;
3782 if (datagramHistory.IsEmpty())
3785 if (congestionManager.LessThan(index, datagramHistoryPopCount))
3789 if (offsetIntoList >= datagramHistory.Size())
3792 *timeSent=datagramHistory[offsetIntoList].timeSent;
3793 return datagramHistory[offsetIntoList].head;
3799 MessageNumberNode *mnm = datagramHistory[offsetIntoList].head;
3800 MessageNumberNode *next;
3807 datagramHistory[offsetIntoList].head=0;
3812 (void) datagramNumber;
3815 RemoveFromDatagramHistory(datagramHistoryPopCount);
3816 datagramHistory.Pop();
3817 datagramHistoryPopCount++;
3820 datagramHistory.Push(DatagramHistoryNode(0, timeSent),
_FILE_AND_LINE_);
3826 (void) datagramNumber;
3830 RemoveFromDatagramHistory(datagramHistoryPopCount);
3831 datagramHistory.Pop();
3832 datagramHistoryPopCount++;
3835 MessageNumberNode *mnm = datagramHistoryMessagePool.Allocate(
_FILE_AND_LINE_);
3837 mnm->messageNumber=messageNumber;
3838 datagramHistory.Push(DatagramHistoryNode(mnm, timeSent),
_FILE_AND_LINE_);
3843 ReliabilityLayer::MessageNumberNode* ReliabilityLayer::AddSubsequentToDatagramHistory(MessageNumberNode *messageNumberNode,
DatagramSequenceNumberType messageNumber)
3845 messageNumberNode->next=datagramHistoryMessagePool.Allocate(
_FILE_AND_LINE_);
3846 messageNumberNode->next->messageNumber=messageNumber;
3847 messageNumberNode->next->next=0;
3848 return messageNumberNode->next;
3854 internalPacket->
data=ourOffset;
3860 (*refCounter)->sharedDataBlock=externallyAllocatedPtr;
3863 (*refCounter)->refCount++;
3867 void ReliabilityLayer::AllocInternalPacketData(
InternalPacket *internalPacket,
unsigned char *externallyAllocatedPtr)
3870 internalPacket->
data=externallyAllocatedPtr;
3873 void ReliabilityLayer::AllocInternalPacketData(
InternalPacket *internalPacket,
unsigned int numBytes,
bool allowStack,
const char *file,
unsigned int line)
3875 if (allowStack && numBytes <=
sizeof(internalPacket->
stackData))
3887 void ReliabilityLayer::FreeInternalPacketData(
InternalPacket *internalPacket,
const char *file,
unsigned int line)
3889 if (internalPacket==0)
3903 refCountedDataPool.Release(internalPacket->
refCountedData,file, line);
3909 if (internalPacket->
data==0)
3913 internalPacket->
data=0;
3918 internalPacket->
data=0;
3922 unsigned int ReliabilityLayer::GetMaxDatagramSizeExcludingMessageHeaderBytes(
void)
3924 unsigned int val = congestionManager.GetMTU() - DatagramHeaderFormat::GetDataHeaderByteLength();
3926 #if LIBCAT_SECURITY==1
3928 val -= cat::AuthenticatedEncryption::OVERHEAD_BYTES;
3934 BitSize_t ReliabilityLayer::GetMaxDatagramSizeExcludingMessageHeaderBits(
void)
3936 return BYTES_TO_BITS(GetMaxDatagramSizeExcludingMessageHeaderBytes());
3939 void ReliabilityLayer::InitHeapWeights(
void)
3942 outgoingPacketBufferNextWeights[priorityLevel]=(1<<priorityLevel)*priorityLevel+priorityLevel;
3947 uint64_t next = outgoingPacketBufferNextWeights[priorityLevel];
3948 if (outgoingPacketBuffer.Size()>0)
3950 int peekPL = outgoingPacketBuffer.Peek()->priority;
3954 next=min + (1<<priorityLevel)*priorityLevel+priorityLevel;
3955 outgoingPacketBufferNextWeights[priorityLevel]=next+(1<<priorityLevel)*(priorityLevel+1)+priorityLevel;