SLikeNet  0.1.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
CloudServer.cpp
Go to the documentation of this file.
1 /*
2  * Original work: Copyright (c) 2014, Oculus VR, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * RakNet License.txt file in the licenses directory of this source tree. An additional grant
7  * of patent rights can be found in the RakNet Patents.txt file in the same directory.
8  *
9  *
10  * Modified work: Copyright (c) 2016-2017, SLikeSoft UG (haftungsbeschränkt)
11  *
12  * This source code was modified by SLikeSoft. Modifications are licensed under the MIT-style
13  * license found in the license.txt file in the root directory of this source tree.
14  */
15 
17 #if _RAKNET_SUPPORT_CloudServer==1
18 
19 #include "slikenet/CloudServer.h"
20 #include "slikenet/GetTime.h"
22 #include "slikenet/BitStream.h"
23 #include "slikenet/peerinterface.h"
24 
25 enum ServerToServerCommands
26 {
27  STSC_PROCESS_GET_REQUEST,
28  STSC_PROCESS_GET_RESPONSE,
29  STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS,
30  STSC_ADD_UPLOADED_KEY,
31  STSC_ADD_SUBSCRIBED_KEY,
32  STSC_REMOVE_UPLOADED_KEY,
33  STSC_REMOVE_SUBSCRIBED_KEY,
34  STSC_DATA_CHANGED,
35 };
36 
37 using namespace SLNet;
38 
39 int CloudServer::RemoteServerComp(const RakNetGUID &key, RemoteServer* const &data )
40 {
41  if (key < data->serverAddress)
42  return -1;
43  if (key > data->serverAddress)
44  return 1;
45  return 0;
46 }
47 int CloudServer::KeySubscriberIDComp(const CloudKey &key, KeySubscriberID * const &data )
48 {
49  if (key.primaryKey < data->key.primaryKey)
50  return -1;
51  if (key.primaryKey > data->key.primaryKey)
52  return 1;
53  if (key.secondaryKey < data->key.secondaryKey)
54  return -1;
55  if (key.secondaryKey > data->key.secondaryKey)
56  return 1;
57  return 0;
58 }
59 int CloudServer::KeyDataPtrComp( const RakNetGUID &key, CloudData* const &data )
60 {
61  if (key < data->clientGUID)
62  return -1;
63  if (key > data->clientGUID)
64  return 1;
65  return 0;
66 }
67 int CloudServer::KeyDataListComp( const CloudKey &key, CloudDataList * const &data )
68 {
69  if (key.primaryKey < data->key.primaryKey)
70  return -1;
71  if (key.primaryKey > data->key.primaryKey)
72  return 1;
73  if (key.secondaryKey < data->key.secondaryKey)
74  return -1;
75  if (key.secondaryKey > data->key.secondaryKey)
76  return 1;
77  return 0;
78 }
80 {
81  if (key < data->serverAddress)
82  return -1;
83  if (key > data->serverAddress)
84  return 1;
85  return 0;
86 }
88 {
89  if (key < data->requestId)
90  return -1;
91  if (key > data->requestId)
92  return -1;
93  return 0;
94 }
95 void CloudServer::CloudQueryWithAddresses::Serialize(bool writeToBitstream, BitStream *bitStream)
96 {
97  cloudQuery.Serialize(writeToBitstream, bitStream);
98 
99  if (writeToBitstream)
100  {
101  bitStream->WriteCasted<uint16_t>(specificSystems.Size());
103  for (uint16_t i=0; i < specificSystems.Size(); i++)
104  {
105  bitStream->Write(specificSystems[i]);
106  }
107  }
108  else
109  {
110  uint16_t specificSystemsCount;
111  RakNetGUID addressOrGuid;
112  bitStream->Read(specificSystemsCount);
113  for (uint16_t i=0; i < specificSystemsCount; i++)
114  {
115  bitStream->Read(addressOrGuid);
116  specificSystems.Push(addressOrGuid, _FILE_AND_LINE_);
117  }
118  }
119 }
121 {
122  unsigned int i;
123  for (i=0; i < remoteServerResponses.Size(); i++)
124  if (remoteServerResponses[i]->gotResult==false)
125  return false;
126  return true;
127 }
129 {
130  unsigned int i;
131  for (i=0; i < remoteServerResponses.Size(); i++)
132  {
133  remoteServerResponses[i]->Clear(allocator);
134  SLNet::OP_DELETE(remoteServerResponses[i], _FILE_AND_LINE_);
135  }
136  remoteServerResponses.Clear(false, _FILE_AND_LINE_);
137 }
139 {
140  unsigned int i;
141  for (i=0; i < queryResult.rowsReturned.Size(); i++)
142  {
143  allocator->DeallocateRowData(queryResult.rowsReturned[i]->data);
144  allocator->DeallocateCloudQueryRow(queryResult.rowsReturned[i]);
145  }
146  queryResult.rowsReturned.Clear(false, _FILE_AND_LINE_);
147 }
149 {
154 }
156 {
157  Clear();
158 }
160 {
162 }
164 {
165  maxBytesPerDowload=bytes;
166 }
167 void CloudServer::Update(void)
168 {
169  // Timeout getRequests
170  SLNet::Time time = SLNet::Time();
171  if (time > nextGetRequestsCheck)
172  {
173  nextGetRequestsCheck=time+1000;
174 
175  unsigned int i=0;
176  while (i < getRequests.Size())
177  {
178  if (time - getRequests[i]->requestStartTime > 3000)
179  {
180  // Remote server is not responding, just send back data with whoever did respond
182  getRequests[i]->Clear(this);
185  }
186  else
187  {
188  i++;
189  }
190  }
191  }
192 }
194 {
195  switch (packet->data[0])
196  {
198  OnPostRequest(packet);
201  OnReleaseRequest(packet);
204  OnGetRequest(packet);
207  OnUnsubscribeRequest(packet);
210  if (packet->length>1)
211  {
212  switch (packet->data[1])
213  {
214  case STSC_PROCESS_GET_REQUEST:
217  case STSC_PROCESS_GET_RESPONSE:
220  case STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS:
223  case STSC_ADD_UPLOADED_KEY:
226  case STSC_ADD_SUBSCRIBED_KEY:
229  case STSC_REMOVE_UPLOADED_KEY:
232  case STSC_REMOVE_SUBSCRIBED_KEY:
235  case STSC_DATA_CHANGED:
236  OnServerDataChanged(packet);
238  }
239  }
241  }
242  return RR_CONTINUE_PROCESSING;
243 }
245 {
246  SLNet::BitStream bsIn(packet->data, packet->length, false);
247  bsIn.IgnoreBytes(sizeof(MessageID));
248  CloudKey key;
249  key.Serialize(false,&bsIn);
250  uint32_t dataLengthBytes;
251  bsIn.Read(dataLengthBytes);
252  if (maxUploadBytesPerClient>0 && dataLengthBytes>maxUploadBytesPerClient)
253  return; // Exceeded max upload bytes
254 
255  bsIn.AlignReadToByteBoundary();
256  for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
257  {
258  if (queryFilters[filterIndex]->OnPostRequest(packet->guid, packet->systemAddress, key, dataLengthBytes, (const char*) bsIn.GetData()+BITS_TO_BYTES(bsIn.GetReadOffset()))==false)
259  return;
260  }
261 
262  unsigned char *data;
263  if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
264  {
265  data = (unsigned char *) rakMalloc_Ex(dataLengthBytes,_FILE_AND_LINE_);
266  if (data==0)
267  {
269  return;
270  }
271  bsIn.ReadAlignedBytes(data,dataLengthBytes);
272  }
273  else
274  data=0;
275 
276  // Add this system to remoteSystems if they aren't there already
277  DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid);
278  RemoteCloudClient *remoteCloudClient;
279  if (remoteSystemsHashIndex.IsInvalid())
280  {
281  remoteCloudClient = SLNet::OP_NEW<RemoteCloudClient>(_FILE_AND_LINE_);
282  remoteCloudClient->uploadedKeys.Insert(key,key,true,_FILE_AND_LINE_);
283  remoteCloudClient->uploadedBytes=0;
284  remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_);
285  }
286  else
287  {
288  remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex);
289  bool objectExists;
290  // Add to RemoteCloudClient::uploadedKeys if it isn't there already
291  unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists);
292  if (objectExists==false)
293  {
294  remoteCloudClient->uploadedKeys.InsertAtIndex(key, uploadedKeysIndex, _FILE_AND_LINE_);
295  }
296  }
297 
298  bool cloudDataAlreadyUploaded;
299  unsigned int dataRepositoryIndex;
300  bool dataRepositoryExists;
301  CloudDataList* cloudDataList = GetOrAllocateCloudDataList(key, &dataRepositoryExists, dataRepositoryIndex);
302  if (dataRepositoryExists==false)
303  {
304  cloudDataList->uploaderCount=1;
305  cloudDataAlreadyUploaded=false;
306  }
307  else
308  {
309  cloudDataAlreadyUploaded=cloudDataList->uploaderCount>0;
310  cloudDataList->uploaderCount++;
311  }
312 
313  CloudData *cloudData;
314  bool keyDataListExists;
315  unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists);
316  if (keyDataListExists==false)
317  {
318  if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes+dataLengthBytes>maxUploadBytesPerClient)
319  {
320  // Undo prior insertion of cloudDataList into cloudData if needed
321  if (keyDataListExists==false)
322  {
323  SLNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_);
324  dataRepository.RemoveAtIndex(dataRepositoryIndex);
325  }
326 
327  if (remoteCloudClient->IsUnused())
328  {
329  SLNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
331  }
332 
333  if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
335 
336  return;
337  }
338 
339  cloudData = SLNet::OP_NEW<CloudData>(_FILE_AND_LINE_);
340  cloudData->dataLengthBytes=dataLengthBytes;
341  cloudData->isUploaded=true;
343  {
344  cloudData->serverSystemAddress=forceAddress;
345  cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort());
346  }
347  else
348  {
349  cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress);
350  if (cloudData->serverSystemAddress.IsLoopback())
351  cloudData->serverSystemAddress.FromString(rakPeerInterface->GetLocalIP(0));
352  }
353  if (cloudData->serverSystemAddress.GetPort()==0)
354  {
355  // Fix localhost port
356  cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetSocket(UNASSIGNED_SYSTEM_ADDRESS)->GetBoundAddress().GetPort());
357  }
358  cloudData->clientSystemAddress=packet->systemAddress;
359  cloudData->serverGUID=rakPeerInterface->GetMyGUID();
360  cloudData->clientGUID=packet->guid;
361  cloudDataList->keyData.Insert(packet->guid,cloudData,true,_FILE_AND_LINE_);
362  }
363  else
364  {
365  cloudData = cloudDataList->keyData[keyDataListIndex];
366 
367  if (cloudDataAlreadyUploaded==false)
368  {
370  {
371  cloudData->serverSystemAddress=forceAddress;
372  cloudData->serverSystemAddress.SetPortHostOrder(rakPeerInterface->GetExternalID(packet->systemAddress).GetPort());
373  }
374  else
375  {
376  cloudData->serverSystemAddress=rakPeerInterface->GetExternalID(packet->systemAddress);
377  }
378  if (cloudData->serverSystemAddress.GetPort()==0)
379  {
380  // Fix localhost port
382  }
383 
384  cloudData->clientSystemAddress=packet->systemAddress;
385  }
386 
387  if (maxUploadBytesPerClient>0 && remoteCloudClient->uploadedBytes-cloudData->dataLengthBytes+dataLengthBytes>maxUploadBytesPerClient)
388  {
389  // Undo prior insertion of cloudDataList into cloudData if needed
390  if (dataRepositoryExists==false)
391  {
392  SLNet::OP_DELETE(cloudDataList,_FILE_AND_LINE_);
393  dataRepository.RemoveAtIndex(dataRepositoryIndex);
394  }
395  return;
396  }
397  else
398  {
399  // Subtract already used bytes we are overwriting
400  remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes;
401  }
402 
403  if (cloudData->allocatedData!=0)
404  rakFree_Ex(cloudData->allocatedData,_FILE_AND_LINE_);
405  }
406 
407  if (dataLengthBytes>CLOUD_SERVER_DATA_STACK_SIZE)
408  {
409  // Data already allocated
410  cloudData->allocatedData=data;
411  cloudData->dataPtr=data;
412  }
413  else
414  {
415  // Read to stack
416  if (dataLengthBytes>0)
417  bsIn.ReadAlignedBytes(cloudData->stackData,dataLengthBytes);
418  cloudData->allocatedData=0;
419  cloudData->dataPtr=cloudData->stackData;
420  }
421  // Update how many bytes were written for this data
422  cloudData->dataLengthBytes=dataLengthBytes;
423  remoteCloudClient->uploadedBytes+=dataLengthBytes;
424 
425  if (cloudDataAlreadyUploaded==false)
426  {
427  // New data field
428  SendUploadedKeyToServers(cloudDataList->key);
429  }
430 
431  // Existing data field changed
432  NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, true );
433  NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, true );
434 
435  // Send update to all remote servers that subscribed to this key
436  NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, true);
437 
438  // I could have also subscribed to a key not yet updated locally
439  // This means I have to go through every RemoteClient that wants this key
440  // Seems like cloudData->specificSubscribers is unnecessary in that case
441 }
443 {
444  SLNet::BitStream bsIn(packet->data, packet->length, false);
445  bsIn.IgnoreBytes(sizeof(MessageID));
446 
447  uint16_t keyCount;
448  bsIn.Read(keyCount);
449 
450  if (keyCount==0)
451  return;
452 
453  DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid);
454  if (remoteSystemIndex.IsInvalid()==true)
455  return;
456 
457  RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
458 
459  CloudKey key;
460 
461  // Read all in a list first so I can run filter on it
463  for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++)
464  {
465  key.Serialize(false, &bsIn);
466  cloudKeys.Push(key, _FILE_AND_LINE_);
467  }
468 
469  for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
470  {
471  if (queryFilters[filterIndex]->OnReleaseRequest(packet->guid, packet->systemAddress, cloudKeys)==false)
472  return;
473  }
474 
475  for (uint16_t keyCountIndex=0; keyCountIndex < keyCount; keyCountIndex++)
476  {
477  // Serialize in list above so I can run the filter on it
478  // key.Serialize(false, &bsIn);
479  key=cloudKeys[keyCountIndex];
480 
481  // Remove remote systems uploaded keys
482  bool objectExists;
483  unsigned int uploadedKeysIndex = remoteCloudClient->uploadedKeys.GetIndexFromKey(key,&objectExists);
484  if (objectExists)
485  {
486  bool dataRepositoryExists;
487  unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists);
488  CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex];
489  RakAssert(cloudDataList);
490 
491  CloudData *cloudData;
492  bool keyDataListExists;
493  unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(packet->guid, &keyDataListExists);
494  cloudData = cloudDataList->keyData[keyDataListIndex];
495 
496  remoteCloudClient->uploadedKeys.RemoveAtIndex(uploadedKeysIndex);
497  remoteCloudClient->uploadedBytes-=cloudData->dataLengthBytes;
498  cloudDataList->uploaderCount--;
499 
500  // Broadcast destruction of this key to subscribers
501  NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false );
502  NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false );
503  NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false );
504 
505  cloudData->Clear();
506 
507  if (cloudData->IsUnused())
508  {
509  SLNet::OP_DELETE(cloudData, _FILE_AND_LINE_);
510  cloudDataList->keyData.RemoveAtIndex(keyDataListIndex);
511  if (cloudDataList->IsNotUploaded())
512  {
513  // Tell other servers that this key is no longer uploaded, so they do not request it from us
514  RemoveUploadedKeyFromServers(cloudDataList->key);
515  }
516 
517  if (cloudDataList->IsUnused())
518  {
519  SLNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
520  dataRepository.RemoveAtIndex(dataRepositoryIndex);
521  }
522  }
523 
524  if (remoteCloudClient->IsUnused())
525  {
526  SLNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
527  remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
528  break;
529  }
530  }
531  }
532 }
533 void CloudServer::OnGetRequest(Packet *packet)
534 {
535  SLNet::BitStream bsIn(packet->data, packet->length, false);
536  bsIn.IgnoreBytes(sizeof(MessageID));
537  uint16_t specificSystemsCount;
538  CloudKey cloudKey;
539 
540  // Create a new GetRequest
541  GetRequest *getRequest;
542  getRequest = SLNet::OP_NEW<GetRequest>(_FILE_AND_LINE_);
543  getRequest->cloudQueryWithAddresses.cloudQuery.Serialize(false, &bsIn);
544  getRequest->requestingClient=packet->guid;
545 
546  RakNetGUID addressOrGuid;
547  bsIn.Read(specificSystemsCount);
548  for (uint16_t i=0; i < specificSystemsCount; i++)
549  {
550  bsIn.Read(addressOrGuid);
551  getRequest->cloudQueryWithAddresses.specificSystems.Push(addressOrGuid, _FILE_AND_LINE_);
552  }
553 
554  if (getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size()==0)
555  {
556  SLNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
557  return;
558  }
559 
560  for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
561  {
562  if (queryFilters[filterIndex]->OnGetRequest(packet->guid, packet->systemAddress, getRequest->cloudQueryWithAddresses.cloudQuery, getRequest->cloudQueryWithAddresses.specificSystems )==false)
563  return;
564  }
565 
566  getRequest->requestStartTime= SLNet::GetTime();
567  getRequest->requestId=nextGetRequestId++;
568 
569  // Send request to servers that have this data
570  DataStructures::List<RemoteServer*> remoteServersWithData;
571  GetServersWithUploadedKeys(getRequest->cloudQueryWithAddresses.cloudQuery.keys, remoteServersWithData);
572 
573  if (remoteServersWithData.Size()==0)
574  {
575  ProcessAndTransmitGetRequest(getRequest);
576  }
577  else
578  {
579  SLNet::BitStream bsOut;
581  bsOut.Write((MessageID)STSC_PROCESS_GET_REQUEST);
582  getRequest->cloudQueryWithAddresses.Serialize(true, &bsOut);
583  bsOut.Write(getRequest->requestId);
584 
585  for (unsigned int remoteServerIndex=0; remoteServerIndex < remoteServersWithData.Size(); remoteServerIndex++)
586  {
587  BufferedGetResponseFromServer* bufferedGetResponseFromServer = SLNet::OP_NEW<BufferedGetResponseFromServer>(_FILE_AND_LINE_);
588  bufferedGetResponseFromServer->serverAddress=remoteServersWithData[remoteServerIndex]->serverAddress;
589  bufferedGetResponseFromServer->gotResult=false;
590  getRequest->remoteServerResponses.Insert(remoteServersWithData[remoteServerIndex]->serverAddress, bufferedGetResponseFromServer, true, _FILE_AND_LINE_);
591 
592  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServersWithData[remoteServerIndex]->serverAddress, false);
593  }
594 
595  // Record that this system made this request
596  getRequests.Insert(getRequest->requestId, getRequest, true, _FILE_AND_LINE_);
597  }
598 
599  if (getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults)
600  {
601  // Add to key subscription list for the client, which contains a keyId / specificUploaderList pair
602  DataStructures::HashIndex remoteSystemsHashIndex = remoteSystems.GetIndexOf(packet->guid);
603  RemoteCloudClient *remoteCloudClient;
604  if (remoteSystemsHashIndex.IsInvalid())
605  {
606  remoteCloudClient = SLNet::OP_NEW<RemoteCloudClient>(_FILE_AND_LINE_);
607  remoteCloudClient->uploadedBytes=0;
608  remoteSystems.Push(packet->guid, remoteCloudClient, _FILE_AND_LINE_);
609  }
610  else
611  {
612  remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemsHashIndex);
613  }
614 
615  unsigned int keyIndex;
616  for (keyIndex=0; keyIndex < getRequest->cloudQueryWithAddresses.cloudQuery.keys.Size(); keyIndex++)
617  {
618  cloudKey = getRequest->cloudQueryWithAddresses.cloudQuery.keys[keyIndex];
619 
620  unsigned int keySubscriberIndex;
621  bool hasKeySubscriber;
622  keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber);
623  KeySubscriberID* keySubscriberId;
624  if (hasKeySubscriber)
625  {
626  DataStructures::List<RakNetGUID> specificSystems;
627  UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems);
628  }
629 
630  keySubscriberId = SLNet::OP_NEW<KeySubscriberID>(_FILE_AND_LINE_);
631  keySubscriberId->key=cloudKey;
632 
633  unsigned int specificSystemIndex;
634  for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
635  {
636  keySubscriberId->specificSystemsSubscribedTo.Insert(getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex], true, _FILE_AND_LINE_);
637  }
638 
639  remoteCloudClient->subscribedKeys.InsertAtIndex(keySubscriberId, keySubscriberIndex, _FILE_AND_LINE_);
640 
641  // Add CloudData in a similar way
642  unsigned int dataRepositoryIndex;
643  bool dataRepositoryExists;
644  CloudDataList* cloudDataList = GetOrAllocateCloudDataList(cloudKey, &dataRepositoryExists, dataRepositoryIndex);
645 
646  // If this is the first local client to subscribe to this key, call SendSubscribedKeyToServers
647  if (cloudDataList->subscriberCount==0)
648  SendSubscribedKeyToServers(cloudKey);
649 
650  // If the subscription is specific, may have to also allocate CloudData
651  if (getRequest->cloudQueryWithAddresses.specificSystems.Size())
652  {
653  CloudData *cloudData;
654  bool keyDataListExists;
655 
656  for (specificSystemIndex=0; specificSystemIndex < getRequest->cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
657  {
658  RakNetGUID specificSystem = getRequest->cloudQueryWithAddresses.specificSystems[specificSystemIndex];
659 
660  unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSystem, &keyDataListExists);
661  if (keyDataListExists==false)
662  {
663  cloudData = SLNet::OP_NEW<CloudData>(_FILE_AND_LINE_);
664  cloudData->dataLengthBytes=0;
665  cloudData->allocatedData=0;
666  cloudData->isUploaded=false;
667  cloudData->dataPtr=0;
668  cloudData->serverSystemAddress=UNASSIGNED_SYSTEM_ADDRESS;
669  cloudData->clientSystemAddress=UNASSIGNED_SYSTEM_ADDRESS;
670  cloudData->serverGUID=rakPeerInterface->GetMyGUID();
671  cloudData->clientGUID=specificSystem;
672  cloudDataList->keyData.Insert(specificSystem,cloudData,true,_FILE_AND_LINE_);
673  }
674  else
675  {
676  cloudData = cloudDataList->keyData[keyDataListIndex];
677  }
678 
679  ++cloudDataList->subscriberCount;
680  cloudData->specificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_);
681  }
682  }
683  else
684  {
685  ++cloudDataList->subscriberCount;
686  cloudDataList->nonSpecificSubscribers.Insert(packet->guid, packet->guid, true, _FILE_AND_LINE_);
687 
688  // Remove packet->guid from CloudData::specificSubscribers among all instances of cloudDataList->keyData
689  unsigned int subscribedKeysIndex;
690  bool subscribedKeysIndexExists;
691  subscribedKeysIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudDataList->key, &subscribedKeysIndexExists);
692  if (subscribedKeysIndexExists)
693  {
694  keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex];
695  for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++)
696  {
697  bool keyDataExists;
698  unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists);
699  if (keyDataExists)
700  {
701  CloudData *keyData = cloudDataList->keyData[keyDataIndex];
702  keyData->specificSubscribers.Remove(packet->guid);
703  --cloudDataList->subscriberCount;
704  }
705  }
706  }
707  }
708  }
709 
710  if (remoteCloudClient->subscribedKeys.Size()==0)
711  {
712  // Didn't do anything
714  SLNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
715  }
716  }
717 
718  if (remoteServersWithData.Size()==0)
719  SLNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
720 }
722 {
723  SLNet::BitStream bsIn(packet->data, packet->length, false);
724  bsIn.IgnoreBytes(sizeof(MessageID));
725 
726  DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(packet->guid);
727  if (remoteSystemIndex.IsInvalid()==true)
728  return;
729 
730  RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
731 
732  uint16_t keyCount, specificSystemCount;
734  DataStructures::List<RakNetGUID> specificSystems;
735  uint16_t index;
736 
737  CloudKey cloudKey;
738  bsIn.Read(keyCount);
739  for (index=0; index < keyCount; index++)
740  {
741  cloudKey.Serialize(false, &bsIn);
742  cloudKeys.Push(cloudKey, _FILE_AND_LINE_);
743  }
744 
745  RakNetGUID specificSystem;
746  bsIn.Read(specificSystemCount);
747  for (index=0; index < specificSystemCount; index++)
748  {
749  bsIn.Read(specificSystem);
750  specificSystems.Push(specificSystem, _FILE_AND_LINE_);
751  }
752 
753  for (unsigned int filterIndex=0; filterIndex < queryFilters.Size(); filterIndex++)
754  {
755  if (queryFilters[filterIndex]->OnUnsubscribeRequest(packet->guid, packet->systemAddress, cloudKeys, specificSystems )==false)
756  return;
757  }
758 
759 // CloudDataList *cloudDataList;
760  bool dataRepositoryExists;
761 // unsigned int dataRepositoryIndex;
762 
763  for (index=0; index < keyCount; index++)
764  {
765  cloudKey = cloudKeys[index];
766 
767  // dataRepositoryIndex =
768  dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists);
769  if (dataRepositoryExists==false)
770  continue;
771 // cloudDataList = dataRepository[dataRepositoryIndex];
772 
773  unsigned int keySubscriberIndex;
774  bool hasKeySubscriber;
775  keySubscriberIndex = remoteCloudClient->subscribedKeys.GetIndexFromKey(cloudKey, &hasKeySubscriber);
776 
777  if (hasKeySubscriber==false)
778  continue;
779 
780  UnsubscribeFromKey(remoteCloudClient, packet->guid, keySubscriberIndex, cloudKey, specificSystems);
781  }
782 
783  if (remoteCloudClient->IsUnused())
784  {
785  SLNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
786  remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
787  }
788 }
790 {
791 // unsigned int remoteServerIndex;
792  bool objectExists;
793  //remoteServerIndex =
794  remoteServers.GetIndexFromKey(packet->guid, &objectExists);
795  if (objectExists==false)
796  return;
797 
798  SLNet::BitStream bsIn(packet->data, packet->length, false);
799  bsIn.IgnoreBytes(sizeof(MessageID)*2);
800 
801  CloudQueryWithAddresses cloudQueryWithAddresses;
802  uint32_t requestId;
803  cloudQueryWithAddresses.Serialize(false, &bsIn);
804  bsIn.Read(requestId);
805 
806  DataStructures::List<CloudData*> cloudDataResultList;
807  DataStructures::List<CloudKey> cloudKeyResultList;
808  ProcessCloudQueryWithAddresses(cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList);
809 
810  SLNet::BitStream bsOut;
811  bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
812  bsOut.Write((MessageID)STSC_PROCESS_GET_RESPONSE);
813  bsOut.Write(requestId);
814  WriteCloudQueryRowFromResultList(cloudDataResultList, cloudKeyResultList, &bsOut);
815  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, packet->guid, false);
816 }
818 {
819  unsigned int remoteServerIndex;
820  bool objectExists;
821  remoteServerIndex = remoteServers.GetIndexFromKey(packet->guid, &objectExists);
822  if (objectExists==false)
823  return;
824 
825  RemoteServer *remoteServer = remoteServers[remoteServerIndex];
826  if (remoteServer==0)
827  return;
828 
829  SLNet::BitStream bsIn(packet->data, packet->length, false);
830  bsIn.IgnoreBytes(sizeof(MessageID)*2);
831 
832  uint32_t requestId;
833  bsIn.Read(requestId);
834 
835  // Lookup request id
836  bool hasGetRequest;
837  unsigned int getRequestIndex;
838  getRequestIndex = getRequests.GetIndexFromKey(requestId, &hasGetRequest);
839  if (hasGetRequest==false)
840  return;
841  GetRequest *getRequest = getRequests[getRequestIndex];
842  bool hasRemoteServer;
843  unsigned int remoteServerResponsesIndex;
844  remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(packet->guid, &hasRemoteServer);
845  if (hasRemoteServer==false)
846  return;
847  BufferedGetResponseFromServer *bufferedGetResponseFromServer;
848  bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponsesIndex];
849  if (bufferedGetResponseFromServer->gotResult==true)
850  return;
851  bufferedGetResponseFromServer->gotResult=true;
852  uint32_t numRows;
853  bufferedGetResponseFromServer->queryResult.SerializeNumRows(false, numRows, &bsIn);
854  bufferedGetResponseFromServer->queryResult.SerializeCloudQueryRows(false, numRows, &bsIn, this);
855 
856  // If all results returned, then also process locally, and return to user
857  if (getRequest->AllRemoteServersHaveResponded())
858  {
859  ProcessAndTransmitGetRequest(getRequest);
860 
861  getRequest->Clear(this);
862  SLNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
863 
864  getRequests.RemoveAtIndex(getRequestIndex);
865  }
866 }
867 void CloudServer::OnClosedConnection(const SystemAddress &systemAddress, RakNetGUID rakNetGUID, PI2_LostConnectionReason lostConnectionReason )
868 {
869  (void) lostConnectionReason;
870  (void) systemAddress;
871 
872  unsigned int remoteServerIndex;
873  bool objectExists;
874  remoteServerIndex = remoteServers.GetIndexFromKey(rakNetGUID, &objectExists);
875  if (objectExists)
876  {
877  // Update remoteServerResponses by removing this server and sending the response if it is now complete
878  unsigned int getRequestIndex=0;
879  while (getRequestIndex < getRequests.Size())
880  {
881  GetRequest *getRequest = getRequests[getRequestIndex];
882  bool waitingForThisServer;
883  unsigned int remoteServerResponsesIndex = getRequest->remoteServerResponses.GetIndexFromKey(rakNetGUID, &waitingForThisServer);
884  if (waitingForThisServer)
885  {
886  getRequest->remoteServerResponses[remoteServerResponsesIndex]->Clear(this);
887  SLNet::OP_DELETE(getRequest->remoteServerResponses[remoteServerResponsesIndex], _FILE_AND_LINE_);
888  getRequest->remoteServerResponses.RemoveAtIndex(remoteServerResponsesIndex);
889 
890  if (getRequest->AllRemoteServersHaveResponded())
891  {
892  ProcessAndTransmitGetRequest(getRequest);
893  getRequest->Clear(this);
894  SLNet::OP_DELETE(getRequest, _FILE_AND_LINE_);
895 
896  getRequests.RemoveAtIndex(getRequestIndex);
897  }
898  else
899  getRequestIndex++;
900  }
901  else
902  getRequestIndex++;
903  }
904 
905  SLNet::OP_DELETE(remoteServers[remoteServerIndex],_FILE_AND_LINE_);
906  remoteServers.RemoveAtIndex(remoteServerIndex);
907  }
908 
909  DataStructures::HashIndex remoteSystemIndex = remoteSystems.GetIndexOf(rakNetGUID);
910  if (remoteSystemIndex.IsInvalid()==false)
911  {
912  RemoteCloudClient* remoteCloudClient = remoteSystems.ItemAtIndex(remoteSystemIndex);
913  unsigned int uploadedKeysIndex;
914  for (uploadedKeysIndex=0; uploadedKeysIndex < remoteCloudClient->uploadedKeys.Size(); uploadedKeysIndex++)
915  {
916  // Delete keys this system has uploaded
917  bool keyDataRepositoryExists;
918  unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->uploadedKeys[uploadedKeysIndex], &keyDataRepositoryExists);
919  if (keyDataRepositoryExists)
920  {
921  CloudDataList* cloudDataList = dataRepository[dataRepositoryIndex];
922  bool keyDataExists;
923  unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(rakNetGUID, &keyDataExists);
924  if (keyDataExists)
925  {
926  CloudData *cloudData = cloudDataList->keyData[keyDataIndex];
927  cloudDataList->uploaderCount--;
928 
929  NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudData->specificSubscribers, false );
930  NotifyClientSubscribersOfDataChange(cloudData, cloudDataList->key, cloudDataList->nonSpecificSubscribers, false );
931  NotifyServerSubscribersOfDataChange(cloudData, cloudDataList->key, false );
932 
933  cloudData->Clear();
934 
935  if (cloudData->IsUnused())
936  {
938  cloudDataList->keyData.RemoveAtIndex(keyDataIndex);
939 
940  if (cloudDataList->IsNotUploaded())
941  {
942  // Tell other servers that this key is no longer uploaded, so they do not request it from us
943  RemoveUploadedKeyFromServers(cloudDataList->key);
944  }
945 
946  if (cloudDataList->IsUnused())
947  {
948  // Tell other servers that this key is no longer uploaded, so they do not request it from us
949  RemoveUploadedKeyFromServers(cloudDataList->key);
950 
951  SLNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
952  dataRepository.RemoveAtIndex(dataRepositoryIndex);
953  }
954  }
955  }
956  }
957  }
958 
959  unsigned int subscribedKeysIndex;
960  for (subscribedKeysIndex=0; subscribedKeysIndex < remoteCloudClient->subscribedKeys.Size(); subscribedKeysIndex++)
961  {
962  KeySubscriberID* keySubscriberId;
963  keySubscriberId = remoteCloudClient->subscribedKeys[subscribedKeysIndex];
964 
965  bool keyDataRepositoryExists;
966  unsigned int keyDataRepositoryIndex = dataRepository.GetIndexFromKey(remoteCloudClient->subscribedKeys[subscribedKeysIndex]->key, &keyDataRepositoryExists);
967  if (keyDataRepositoryExists)
968  {
969  CloudDataList* cloudDataList = dataRepository[keyDataRepositoryIndex];
970  if (keySubscriberId->specificSystemsSubscribedTo.Size()==0)
971  {
972  cloudDataList->nonSpecificSubscribers.Remove(rakNetGUID);
973  --cloudDataList->subscriberCount;
974  }
975  else
976  {
977  unsigned int specificSystemIndex;
978  for (specificSystemIndex=0; specificSystemIndex < keySubscriberId->specificSystemsSubscribedTo.Size(); specificSystemIndex++)
979  {
980  bool keyDataExists;
981  unsigned int keyDataIndex = cloudDataList->keyData.GetIndexFromKey(keySubscriberId->specificSystemsSubscribedTo[specificSystemIndex], &keyDataExists);
982  if (keyDataExists)
983  {
984  CloudData *keyData = cloudDataList->keyData[keyDataIndex];
985  keyData->specificSubscribers.Remove(rakNetGUID);
986  --cloudDataList->subscriberCount;
987  }
988  }
989  }
990  }
991 
992  SLNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_);
993  }
994 
995  // Delete and remove from remoteSystems
996  SLNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
997  remoteSystems.RemoveAtIndex(remoteSystemIndex, _FILE_AND_LINE_);
998  }
999 }
1001 {
1002  Clear();
1003 }
1004 void CloudServer::Clear(void)
1005 {
1006  unsigned int i,j;
1007  for (i=0; i < dataRepository.Size(); i++)
1008  {
1009  CloudDataList *cloudDataList = dataRepository[i];
1010  for (j=0; j < cloudDataList->keyData.Size(); j++)
1011  {
1012  cloudDataList->keyData[j]->Clear();
1013  SLNet::OP_DELETE(cloudDataList->keyData[j], _FILE_AND_LINE_);
1014  }
1015  SLNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
1016  }
1018 
1019  for (i=0; i < remoteServers.Size(); i++)
1020  {
1022  }
1024 
1025  for (i=0; i < getRequests.Size(); i++)
1026  {
1027  GetRequest *getRequest = getRequests[i];
1028  getRequest->Clear(this);
1030  }
1032 
1035  remoteSystems.GetAsList(itemList, keyList, _FILE_AND_LINE_);
1036  for (i=0; i < itemList.Size(); i++)
1037  {
1038  RemoteCloudClient* remoteCloudClient = itemList[i];
1039  for (j=0; j < remoteCloudClient->subscribedKeys.Size(); j++)
1040  {
1041  SLNet::OP_DELETE(remoteCloudClient->subscribedKeys[j], _FILE_AND_LINE_);
1042  }
1043  SLNet::OP_DELETE(remoteCloudClient, _FILE_AND_LINE_);
1044  }
1046 }
1048 {
1049  bsOut->WriteCasted<uint32_t>(cloudKeyResultList.Size());
1050  unsigned int i;
1051  for (i=0; i < cloudKeyResultList.Size(); i++)
1052  {
1053  WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, bsOut);
1054  }
1055 }
1056 void CloudServer::WriteCloudQueryRowFromResultList(unsigned int i, DataStructures::List<CloudData*> &cloudDataResultList, DataStructures::List<CloudKey> &cloudKeyResultList, BitStream *bsOut)
1057 {
1058  CloudQueryRow cloudQueryRow;
1059  CloudData *cloudData = cloudDataResultList[i];
1060  cloudQueryRow.key=cloudKeyResultList[i];
1061  cloudQueryRow.data=cloudData->dataPtr;
1062  cloudQueryRow.length=cloudData->dataLengthBytes;
1063  cloudQueryRow.serverSystemAddress=cloudData->serverSystemAddress;
1064  cloudQueryRow.clientSystemAddress=cloudData->clientSystemAddress;
1065  cloudQueryRow.serverGUID=cloudData->serverGUID;
1066  cloudQueryRow.clientGUID=cloudData->clientGUID;
1067  cloudQueryRow.Serialize(true, bsOut, 0);
1068 }
1069 void CloudServer::NotifyClientSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, DataStructures::OrderedList<RakNetGUID, RakNetGUID> &subscribers, bool wasUpdated )
1070 {
1071  SLNet::BitStream bsOut;
1073  bsOut.Write(wasUpdated);
1074  CloudQueryRow row;
1075  row.key=key;
1076  row.data=cloudData->dataPtr;
1077  row.length=cloudData->dataLengthBytes;
1078  row.serverSystemAddress=cloudData->serverSystemAddress;
1079  row.clientSystemAddress=cloudData->clientSystemAddress;
1080  row.serverGUID=cloudData->serverGUID;
1081  row.clientGUID=cloudData->clientGUID;
1082  row.Serialize(true,&bsOut,0);
1083 
1084  unsigned int i;
1085  for (i=0; i < subscribers.Size(); i++)
1086  {
1087  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false);
1088  }
1089 }
1091 {
1092  SLNet::BitStream bsOut;
1093  bsOut.Write((MessageID) ID_CLOUD_SUBSCRIPTION_NOTIFICATION);
1094  bsOut.Write(wasUpdated);
1095  row->Serialize(true,&bsOut,0);
1096 
1097  unsigned int i;
1098  for (i=0; i < subscribers.Size(); i++)
1099  {
1100  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, subscribers[i], false);
1101  }
1102 }
1103 void CloudServer::NotifyServerSubscribersOfDataChange( CloudData *cloudData, CloudKey &key, bool wasUpdated )
1104 {
1105  // Find every server that has subscribed
1106  // Send them change notifications
1107  SLNet::BitStream bsOut;
1108  bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
1109  bsOut.Write((MessageID)STSC_DATA_CHANGED);
1110  bsOut.Write(wasUpdated);
1111  CloudQueryRow row;
1112  row.key=key;
1113  row.data=cloudData->dataPtr;
1114  row.length=cloudData->dataLengthBytes;
1115  row.serverSystemAddress=cloudData->serverSystemAddress;
1116  row.clientSystemAddress=cloudData->clientSystemAddress;
1117  row.serverGUID=cloudData->serverGUID;
1118  row.clientGUID=cloudData->clientGUID;
1119  row.Serialize(true,&bsOut,0);
1120 
1121  unsigned int i;
1122  for (i=0; i < remoteServers.Size(); i++)
1123  {
1124  if (remoteServers[i]->gotSubscribedAndUploadedKeys==false || remoteServers[i]->subscribedKeys.HasData(key))
1125  {
1126  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
1127  }
1128  }
1129 }
1130 void CloudServer::AddServer(RakNetGUID systemIdentifier)
1131 {
1132  ConnectionState cs = rakPeerInterface->GetConnectionState(systemIdentifier);
1133  if (cs==IS_DISCONNECTED || cs==IS_NOT_CONNECTED)
1134  return;
1135  bool objectExists;
1136  unsigned int index = remoteServers.GetIndexFromKey(systemIdentifier,&objectExists);
1137  if (objectExists==false)
1138  {
1139  RemoteServer *remoteServer = SLNet::OP_NEW<RemoteServer>(_FILE_AND_LINE_);
1140  remoteServer->gotSubscribedAndUploadedKeys=false;
1141  remoteServer->serverAddress=systemIdentifier;
1142  remoteServers.InsertAtIndex(remoteServer, index, _FILE_AND_LINE_);
1143 
1144  SendUploadedAndSubscribedKeysToServer(systemIdentifier);
1145  }
1146 }
1147 void CloudServer::RemoveServer(RakNetGUID systemAddress)
1148 {
1149  bool objectExists;
1150  unsigned int index = remoteServers.GetIndexFromKey(systemAddress,&objectExists);
1151  if (objectExists==true)
1152  {
1155  }
1156 }
1158 {
1159  remoteServersOut.Clear(true, _FILE_AND_LINE_);
1160 
1161  unsigned int i;
1162  for (i=0; i < remoteServers.Size(); i++)
1163  {
1164  remoteServersOut.Push(remoteServers[i]->serverAddress, _FILE_AND_LINE_);
1165  }
1166 }
1167 void CloudServer::ProcessAndTransmitGetRequest(GetRequest *getRequest)
1168 {
1169  SLNet::BitStream bsOut;
1171 
1172  // BufferedGetResponseFromServer getResponse;
1173  CloudQueryResult cloudQueryResult;
1174  cloudQueryResult.cloudQuery=getRequest->cloudQueryWithAddresses.cloudQuery;
1175  cloudQueryResult.subscribeToResults=getRequest->cloudQueryWithAddresses.cloudQuery.subscribeToResults;
1176  cloudQueryResult.SerializeHeader(true, &bsOut);
1177 
1178  DataStructures::List<CloudData*> cloudDataResultList;
1179  DataStructures::List<CloudKey> cloudKeyResultList;
1180  ProcessCloudQueryWithAddresses(getRequest->cloudQueryWithAddresses, cloudDataResultList, cloudKeyResultList);
1181  bool unlimitedRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn==0;
1182 
1183  uint32_t localNumRows = (uint32_t) cloudDataResultList.Size();
1184  if (unlimitedRows==false &&
1185  localNumRows > getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex &&
1186  localNumRows - getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex > getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn )
1187  localNumRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex + getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn;
1188 
1189  BitSize_t bitStreamOffset = bsOut.GetWriteOffset();
1190  uint32_t localRowsToWrite;
1191  unsigned int skipRows;
1192  if (localNumRows>getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex)
1193  {
1194  localRowsToWrite=localNumRows-getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex;
1195  skipRows=0;
1196  }
1197  else
1198  {
1199  localRowsToWrite=0;
1200  skipRows=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex-localNumRows;
1201  }
1202  cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut);
1203  for (unsigned int i=getRequest->cloudQueryWithAddresses.cloudQuery.startingRowIndex; i < localNumRows; i++)
1204  {
1205  WriteCloudQueryRowFromResultList(i, cloudDataResultList, cloudKeyResultList, &bsOut);
1206  }
1207 
1208  // Append remote systems for remaining rows
1209  if (unlimitedRows==true || getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn>localRowsToWrite)
1210  {
1211  uint32_t remainingRows=0;
1212  uint32_t additionalRowsWritten=0;
1213  if (unlimitedRows==false)
1214  remainingRows=getRequest->cloudQueryWithAddresses.cloudQuery.maxRowsToReturn-localRowsToWrite;
1215 
1216  unsigned int remoteServerResponseIndex;
1217  for (remoteServerResponseIndex=0; remoteServerResponseIndex < getRequest->remoteServerResponses.Size(); remoteServerResponseIndex++)
1218  {
1219  BufferedGetResponseFromServer *bufferedGetResponseFromServer = getRequest->remoteServerResponses[remoteServerResponseIndex];
1220  unsigned int cloudQueryRowIndex;
1221  for (cloudQueryRowIndex=0; cloudQueryRowIndex < bufferedGetResponseFromServer->queryResult.rowsReturned.Size(); cloudQueryRowIndex++)
1222  {
1223  if (skipRows>0)
1224  {
1225  --skipRows;
1226  continue;
1227  }
1228  bufferedGetResponseFromServer->queryResult.rowsReturned[cloudQueryRowIndex]->Serialize(true, &bsOut, this);
1229 
1230  ++additionalRowsWritten;
1231  if (unlimitedRows==false && --remainingRows==0)
1232  break;
1233  }
1234 
1235  if (unlimitedRows==false && remainingRows==0)
1236  break;
1237  }
1238 
1239  if (additionalRowsWritten>0)
1240  {
1241  BitSize_t curOffset = bsOut.GetWriteOffset();
1242  bsOut.SetWriteOffset(bitStreamOffset);
1243  localRowsToWrite+=additionalRowsWritten;
1244  cloudQueryResult.SerializeNumRows(true, localRowsToWrite, &bsOut);
1245  bsOut.SetWriteOffset(curOffset);
1246  }
1247  }
1248 
1249  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, getRequest->requestingClient, false);
1250 }
1252 {
1253  CloudQueryResult cloudQueryResult;
1254  CloudQueryRow cloudQueryRow;
1255  unsigned int queryIndex;
1256  bool dataRepositoryExists;
1257  CloudDataList* cloudDataList;
1258  unsigned int keyDataIndex;
1259 
1260  // If specificSystems list empty, applies to all systems
1261  // For each of keys in cloudQueryWithAddresses, return that data, limited by maxRowsToReturn
1262  for (queryIndex=0; queryIndex < cloudQueryWithAddresses.cloudQuery.keys.Size(); queryIndex++)
1263  {
1264  const CloudKey &key = cloudQueryWithAddresses.cloudQuery.keys[queryIndex];
1265 
1266  unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(key, &dataRepositoryExists);
1267  if (dataRepositoryExists)
1268  {
1269  cloudDataList=dataRepository[dataRepositoryIndex];
1270 
1271  if (cloudDataList->uploaderCount>0)
1272  {
1273  // Return all keyData that was uploaded by specificSystems, or all if not specified
1274  if (cloudQueryWithAddresses.specificSystems.Size()>0)
1275  {
1276  // Return data for matching systems
1277  unsigned int specificSystemIndex;
1278  for (specificSystemIndex=0; specificSystemIndex < cloudQueryWithAddresses.specificSystems.Size(); specificSystemIndex++)
1279  {
1280  bool uploaderExists;
1281  keyDataIndex = cloudDataList->keyData.GetIndexFromKey(cloudQueryWithAddresses.specificSystems[specificSystemIndex], &uploaderExists);
1282  if (uploaderExists)
1283  {
1284  cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_);
1285  cloudKeyResultList.Push(key, _FILE_AND_LINE_);
1286  }
1287  }
1288  }
1289  else
1290  {
1291  // Return data for all systems
1292  for (keyDataIndex=0; keyDataIndex < cloudDataList->keyData.Size(); keyDataIndex++)
1293  {
1294  cloudDataResultList.Push(cloudDataList->keyData[keyDataIndex], _FILE_AND_LINE_);
1295  cloudKeyResultList.Push(key, _FILE_AND_LINE_);
1296  }
1297  }
1298  }
1299  }
1300  }
1301 }
1303 {
1304  SLNet::BitStream bsOut;
1305  bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
1306  bsOut.Write((MessageID)STSC_ADD_UPLOADED_AND_SUBSCRIBED_KEYS);
1308  for (unsigned int i=0; i < dataRepository.Size(); i++)
1309  dataRepository[i]->key.Serialize(true, &bsOut);
1310 
1311  BitSize_t startOffset, endOffset;
1312  uint16_t subscribedKeyCount=0;
1313  startOffset=bsOut.GetWriteOffset();
1314  bsOut.WriteCasted<uint16_t>(subscribedKeyCount);
1315  for (unsigned int i=0; i < dataRepository.Size(); i++)
1316  {
1317  if (dataRepository[i]->subscriberCount>0)
1318  {
1319  dataRepository[i]->key.Serialize(true, &bsOut);
1320  subscribedKeyCount++;
1321  }
1322  }
1323  endOffset=bsOut.GetWriteOffset();
1324  bsOut.SetWriteOffset(startOffset);
1325  bsOut.WriteCasted<uint16_t>(subscribedKeyCount);
1326  bsOut.SetWriteOffset(endOffset);
1327 
1328  if (dataRepository.Size()>0 || subscribedKeyCount>0)
1329  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, systemAddress, false);
1330 }
1332 {
1333  SLNet::BitStream bsOut;
1334  bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
1335  bsOut.Write((MessageID)STSC_ADD_UPLOADED_KEY);
1336  cloudKey.Serialize(true, &bsOut);
1337  for (unsigned int i=0; i < remoteServers.Size(); i++)
1338  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
1339 }
1341 {
1342  SLNet::BitStream bsOut;
1343  bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
1344  bsOut.Write((MessageID)STSC_ADD_SUBSCRIBED_KEY);
1345  cloudKey.Serialize(true, &bsOut);
1346  for (unsigned int i=0; i < remoteServers.Size(); i++)
1347  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
1348 }
1350 {
1351  SLNet::BitStream bsOut;
1352  bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
1353  bsOut.Write((MessageID)STSC_REMOVE_UPLOADED_KEY);
1354  cloudKey.Serialize(true, &bsOut);
1355  for (unsigned int i=0; i < remoteServers.Size(); i++)
1356  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
1357 }
1359 {
1360  SLNet::BitStream bsOut;
1361  bsOut.Write((MessageID)ID_CLOUD_SERVER_TO_SERVER_COMMAND);
1362  bsOut.Write((MessageID)STSC_REMOVE_SUBSCRIBED_KEY);
1363  cloudKey.Serialize(true, &bsOut);
1364  for (unsigned int i=0; i < remoteServers.Size(); i++)
1365  SendUnified(&bsOut, HIGH_PRIORITY, RELIABLE_ORDERED, 0, remoteServers[i]->serverAddress, false);
1366 }
1368 {
1369  SLNet::BitStream bsIn(packet->data, packet->length, false);
1370  bsIn.IgnoreBytes(sizeof(MessageID)*2);
1371 
1372  bool objectExists;
1373  unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
1374  if (objectExists==false)
1375  return;
1376  RemoteServer *remoteServer = remoteServers[index];
1377  remoteServer->gotSubscribedAndUploadedKeys=true;
1378 
1379 // unsigned int insertionIndex;
1380  bool alreadyHasKey;
1381  uint16_t numUploadedKeys, numSubscribedKeys;
1382  bsIn.Read(numUploadedKeys);
1383  for (uint16_t i=0; i < numUploadedKeys; i++)
1384  {
1385  CloudKey cloudKey;
1386  cloudKey.Serialize(false, &bsIn);
1387 
1388  // insertionIndex =
1389  remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
1390  if (alreadyHasKey==false)
1391  remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
1392  }
1393 
1394  bsIn.Read(numSubscribedKeys);
1395  for (uint16_t i=0; i < numSubscribedKeys; i++)
1396  {
1397  CloudKey cloudKey;
1398  cloudKey.Serialize(false, &bsIn);
1399 
1400  //insertionIndex =
1401  remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
1402  if (alreadyHasKey==false)
1403  remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
1404  }
1405 
1406  // Potential todo - join servers
1407  // For each uploaded key that we subscribe to, query it
1408  // For each subscribed key that we have, send it
1409 }
1411 {
1412  SLNet::BitStream bsIn(packet->data, packet->length, false);
1413  bsIn.IgnoreBytes(sizeof(MessageID)*2);
1414 
1415  bool objectExists;
1416  unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
1417  if (objectExists==false)
1418  return;
1419  RemoteServer *remoteServer = remoteServers[index];
1420  CloudKey cloudKey;
1421  cloudKey.Serialize(false, &bsIn);
1422 // unsigned int insertionIndex;
1423  bool alreadyHasKey;
1424 // insertionIndex =
1425  remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
1426  if (alreadyHasKey==false)
1427  remoteServer->uploadedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
1428 }
1430 {
1431  SLNet::BitStream bsIn(packet->data, packet->length, false);
1432  bsIn.IgnoreBytes(sizeof(MessageID)*2);
1433 
1434  bool objectExists;
1435  unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
1436  if (objectExists==false)
1437  return;
1438  RemoteServer *remoteServer = remoteServers[index];
1439  CloudKey cloudKey;
1440  cloudKey.Serialize(false, &bsIn);
1441 // unsigned int insertionIndex;
1442  bool alreadyHasKey;
1443 // insertionIndex =
1444  remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
1445 
1446  // Do not need to send current values, the Get request will do that as the Get request is sent at the same time
1447  if (alreadyHasKey==false)
1448  remoteServer->subscribedKeys.Insert(cloudKey,cloudKey,true,_FILE_AND_LINE_);
1449 }
1451 {
1452  SLNet::BitStream bsIn(packet->data, packet->length, false);
1453  bsIn.IgnoreBytes(sizeof(MessageID)*2);
1454 
1455  bool objectExists;
1456  unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
1457  if (objectExists==false)
1458  return;
1459  RemoteServer *remoteServer = remoteServers[index];
1460  CloudKey cloudKey;
1461  cloudKey.Serialize(false, &bsIn);
1462  unsigned int insertionIndex;
1463  bool alreadyHasKey;
1464  insertionIndex = remoteServer->uploadedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
1465  if (alreadyHasKey==true)
1466  remoteServer->uploadedKeys.RemoveAtIndex(insertionIndex);
1467 }
1469 {
1470  SLNet::BitStream bsIn(packet->data, packet->length, false);
1471  bsIn.IgnoreBytes(sizeof(MessageID)*2);
1472 
1473  bool objectExists;
1474  unsigned int index = remoteServers.GetIndexFromKey(packet->guid,&objectExists);
1475  if (objectExists==false)
1476  return;
1477  RemoteServer *remoteServer = remoteServers[index];
1478  CloudKey cloudKey;
1479  cloudKey.Serialize(false, &bsIn);
1480  unsigned int insertionIndex;
1481  bool alreadyHasKey;
1482  insertionIndex = remoteServer->subscribedKeys.GetIndexFromKey(cloudKey, &alreadyHasKey);
1483  if (alreadyHasKey==true)
1484  remoteServer->subscribedKeys.RemoveAtIndex(insertionIndex);
1485 }
1487 {
1488  SLNet::BitStream bsIn(packet->data, packet->length, false);
1489  bsIn.IgnoreBytes(sizeof(MessageID)*2);
1490 
1491  bool objectExists;
1492  remoteServers.GetIndexFromKey(packet->guid,&objectExists);
1493  if (objectExists==false)
1494  return;
1495 
1496  // Find everyone that cares about this change and relay
1497  bool wasUpdated=false;
1498  bsIn.Read(wasUpdated);
1499  CloudQueryRow row;
1500  row.Serialize(false, &bsIn, this);
1501 
1502  CloudDataList *cloudDataList;
1503  bool dataRepositoryExists;
1504  unsigned int dataRepositoryIndex;
1505  dataRepositoryIndex = dataRepository.GetIndexFromKey(row.key, &dataRepositoryExists);
1506  if (dataRepositoryExists==false)
1507  {
1508  DeallocateRowData(row.data);
1509  return;
1510  }
1511  cloudDataList = dataRepository[dataRepositoryIndex];
1512  CloudData *cloudData;
1513  bool keyDataListExists;
1514  unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(row.clientGUID, &keyDataListExists);
1515  if (keyDataListExists==true)
1516  {
1517  cloudData = cloudDataList->keyData[keyDataListIndex];
1518  NotifyClientSubscribersOfDataChange(&row, cloudData->specificSubscribers, wasUpdated );
1519  }
1520 
1521  NotifyClientSubscribersOfDataChange(&row, cloudDataList->nonSpecificSubscribers, wasUpdated );
1522  DeallocateRowData(row.data);
1523 }
1527  )
1528 {
1529  remoteServersWithData.Clear(true, _FILE_AND_LINE_);
1530 
1531  unsigned int i,j;
1532  for (i=0; i < remoteServers.Size(); i++)
1533  {
1534  remoteServers[i]->workingFlag=false;
1535  }
1536 
1537  for (i=0; i < remoteServers.Size(); i++)
1538  {
1539  if (remoteServers[i]->workingFlag==false)
1540  {
1541  if (remoteServers[i]->gotSubscribedAndUploadedKeys==false)
1542  {
1543  remoteServers[i]->workingFlag=true;
1544  remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_);
1545  }
1546  else
1547  {
1548  remoteServers[i]->workingFlag=false;
1549  for (j=0; j < keys.Size(); j++)
1550  {
1551  if (remoteServers[i]->workingFlag==false && remoteServers[i]->uploadedKeys.HasData(keys[j]))
1552  {
1553  remoteServers[i]->workingFlag=true;
1554  remoteServersWithData.Push(remoteServers[i], _FILE_AND_LINE_);
1555  break;
1556  }
1557  }
1558  }
1559  }
1560  }
1561 }
1562 
1563 CloudServer::CloudDataList *CloudServer::GetOrAllocateCloudDataList(CloudKey key, bool *dataRepositoryExists, unsigned int &dataRepositoryIndex)
1564 {
1565  CloudDataList *cloudDataList;
1566 
1567  dataRepositoryIndex = dataRepository.GetIndexFromKey(key, dataRepositoryExists);
1568  if (*dataRepositoryExists==false)
1569  {
1570  cloudDataList = SLNet::OP_NEW<CloudDataList>(_FILE_AND_LINE_);
1571  cloudDataList->key=key;
1572  cloudDataList->uploaderCount=0;
1573  cloudDataList->subscriberCount=0;
1574  dataRepository.InsertAtIndex(cloudDataList,dataRepositoryIndex,_FILE_AND_LINE_);
1575  }
1576  else
1577  {
1578  cloudDataList = dataRepository[dataRepositoryIndex];
1579  }
1580 
1581  return cloudDataList;
1582 }
1583 
1584 void CloudServer::UnsubscribeFromKey(RemoteCloudClient *remoteCloudClient, RakNetGUID remoteCloudClientGuid, unsigned int keySubscriberIndex, CloudKey &cloudKey, DataStructures::List<RakNetGUID> &specificSystems)
1585 {
1586  KeySubscriberID* keySubscriberId = remoteCloudClient->subscribedKeys[keySubscriberIndex];
1587 
1588  // If removing specific systems, but global subscription, fail
1589  if (keySubscriberId->specificSystemsSubscribedTo.Size()==0 && specificSystems.Size()>0)
1590  return;
1591 
1592  bool dataRepositoryExists;
1593  CloudDataList *cloudDataList;
1594  unsigned int dataRepositoryIndex = dataRepository.GetIndexFromKey(cloudKey, &dataRepositoryExists);
1595  if (dataRepositoryExists==false)
1596  return;
1597 
1598  unsigned int i,j;
1599 
1600  cloudDataList = dataRepository[dataRepositoryIndex];
1601  if (specificSystems.Size()==0)
1602  {
1603  // Remove global subscriber. If returns false, have to remove specific subscribers
1604  if (cloudDataList->RemoveSubscriber(remoteCloudClientGuid)==false)
1605  {
1606  for (i=0; i < keySubscriberId->specificSystemsSubscribedTo.Size(); i++)
1607  {
1608  RemoveSpecificSubscriber(keySubscriberId->specificSystemsSubscribedTo[i], cloudDataList, remoteCloudClientGuid);
1609  }
1610  }
1611  keySubscriberId->specificSystemsSubscribedTo.Clear(true, _FILE_AND_LINE_);
1612  }
1613  else
1614  {
1615  for (j=0; j < specificSystems.Size(); j++)
1616  {
1617  unsigned int specificSystemsSubscribedToIndex;
1618  bool hasSpecificSystemsSubscribedTo;
1619  specificSystemsSubscribedToIndex=keySubscriberId->specificSystemsSubscribedTo.GetIndexFromKey(specificSystems[j], &hasSpecificSystemsSubscribedTo);
1620  if (hasSpecificSystemsSubscribedTo)
1621  {
1622  RemoveSpecificSubscriber(specificSystems[j], cloudDataList, remoteCloudClientGuid);
1623  keySubscriberId->specificSystemsSubscribedTo.RemoveAtIndex(specificSystemsSubscribedToIndex);
1624  }
1625  }
1626  }
1627 
1628  if (keySubscriberId->specificSystemsSubscribedTo.Size()==0)
1629  {
1630  SLNet::OP_DELETE(keySubscriberId, _FILE_AND_LINE_);
1631  remoteCloudClient->subscribedKeys.RemoveAtIndex(keySubscriberIndex);
1632  }
1633 
1634  if (cloudDataList->subscriberCount==0)
1636 
1637  if (cloudDataList->IsUnused())
1638  {
1639  SLNet::OP_DELETE(cloudDataList, _FILE_AND_LINE_);
1640  dataRepository.RemoveAtIndex(dataRepositoryIndex);
1641  }
1642 }
1643 void CloudServer::RemoveSpecificSubscriber(RakNetGUID specificSubscriber, CloudDataList *cloudDataList, RakNetGUID remoteCloudClientGuid)
1644 {
1645  bool keyDataListExists;
1646  unsigned int keyDataListIndex = cloudDataList->keyData.GetIndexFromKey(specificSubscriber, &keyDataListExists);
1647  if (keyDataListExists==false)
1648  return;
1649  CloudData *cloudData = cloudDataList->keyData[keyDataListIndex];
1650  bool hasSpecificSubscriber;
1651  unsigned int specificSubscriberIndex = cloudData->specificSubscribers.GetIndexFromKey(remoteCloudClientGuid, &hasSpecificSubscriber);
1652  if (hasSpecificSubscriber)
1653  {
1654  cloudData->specificSubscribers.RemoveAtIndex(specificSubscriberIndex);
1655  cloudDataList->subscriberCount--;
1656 
1657  if (cloudData->IsUnused())
1658  {
1659  SLNet::OP_DELETE(cloudData, _FILE_AND_LINE_);
1660  cloudDataList->keyData.RemoveAtIndex(keyDataListIndex);
1661  }
1662  }
1663 }
1664 
1666 {
1667  forceAddress=forcedAddress;
1668 }
1670 {
1671  if (queryFilters.GetIndexOf(filter)!=(unsigned int) -1)
1672  return;
1674 }
1676 {
1677  unsigned int index;
1678  index = queryFilters.GetIndexOf(filter);
1679  if (index != (unsigned int) -1)
1680  queryFilters.RemoveAtIndex(index);
1681 }
1683 {
1685 }
1686 
1687 #endif