Changeset 114:05d0451ae828 in finroc_plugins_tcp-java
- Timestamp:
- 08.06.2017 03:24:53 (6 years ago)
- Branch:
- 17.03
- Phase:
- public
- Files:
-
- 2 deleted
- 7 edited
Legend:
- Unmodified
- Added
- Removed
-
Peer.java
r91 r114 22 22 package org.finroc.plugins.tcp; 23 23 24 import org.finroc.core.datatype.FrameworkElementInfo;25 24 import org.finroc.core.plugin.ExternalConnection; 25 import org.finroc.core.remote.Definitions; 26 26 import org.finroc.plugins.tcp.internal.TCP; 27 27 import org.finroc.plugins.tcp.internal.TCPPeer; … … 58 58 serverListenAddress = "0.0.0.0"; 59 59 } 60 implementation = new TCPPeer(this, peerName, TCP.PeerType.FULL, FrameworkElementInfo.StructureExchange.SHARED_PORTS, networkConnection,60 implementation = new TCPPeer(this, peerName, TCP.PeerType.FULL, Definitions.StructureExchange.SHARED_PORTS, networkConnection, 61 61 preferredServerPort, tryNextPortsIfOccupied, autoConnectToAllPeers, serverListenAddress); 62 62 } … … 75 75 serverListenAddress = "0.0.0.0"; 76 76 } 77 implementation = new TCPPeer(this, peerName, TCP.PeerType.SERVER_ONLY, FrameworkElementInfo.StructureExchange.SHARED_PORTS, "",77 implementation = new TCPPeer(this, peerName, TCP.PeerType.SERVER_ONLY, Definitions.StructureExchange.SHARED_PORTS, "", 78 78 preferredServerPort, tryNextPortsIfOccupied, false, serverListenAddress); 79 79 } … … 87 87 * @param autoConnectToAllPeers Auto-connect to all peers that become known? 88 88 */ 89 public Peer(String peerName, String networkConnection, FrameworkElementInfo.StructureExchange structureExchange, boolean autoConnectToAllPeers) {89 public Peer(String peerName, String networkConnection, Definitions.StructureExchange structureExchange, boolean autoConnectToAllPeers) { 90 90 super("TCP", networkConnection); 91 91 implementation = new TCPPeer(this, peerName, TCP.PeerType.CLIENT_ONLY, structureExchange, networkConnection, -1, false, autoConnectToAllPeers, ""); -
TCPSettings.java
r101 r114 50 50 public static final int DEQUEUE_QUEUE_SIZE = 50; 51 51 52 /** Maximum not acknowledged Packet */53 public static final int MAX_NOT_ACKNOWLEDGED_PACKETS = 0x1F; // 32 (2^x for fast modulo)54 55 /** Packets considered when calculating avergage ping time */56 public static final int AVG_PING_PACKETS = 0x7; // 8 (2^x for fast modulo)57 58 52 /** Help for debugging: insert checks in data stream => more bandwidth */ 59 53 public static final boolean DEBUG_TCP = true; -
internal/PeerInfo.java
r109 r114 57 57 public ArrayList<InetAddress> addresses = new ArrayList<InetAddress>(); 58 58 59 /** Are we currently connected with this peer? */60 public volatile boolean connected;61 62 59 /** True, if there are ongoing attempts to connect to this peer */ 63 60 public volatile boolean connecting; … … 88 85 } 89 86 87 /** Whether there is currently a connection to this peer */ 88 public boolean isConnected() { 89 return remotePart != null && remotePart.isConnected(); 90 } 91 90 92 public String toString() { 91 93 return name != null ? (name + " (" + uuid.toString() + ")") : uuid.toString(); -
internal/RemotePart.java
r107 r114 22 22 package org.finroc.plugins.tcp.internal; 23 23 24 import java.util.ArrayList;25 import java.util.HashMap;26 import java.util.List;27 24 import java.util.concurrent.atomic.AtomicInteger; 28 import java.util.concurrent.atomic.AtomicLong;29 25 30 26 import org.finroc.core.FrameworkElement; 31 import org.finroc.core.FrameworkElementFlags;32 import org.finroc.core.FrameworkElementTags;33 import org.finroc.core.LockOrderLevels;34 27 import org.finroc.core.RuntimeEnvironment; 35 import org.finroc.core.admin.AdminClient; 36 import org.finroc.core.admin.AdministrationService; 37 import org.finroc.core.datatype.FrameworkElementInfo; 38 import org.finroc.core.port.AbstractPort; 39 import org.finroc.core.port.PortCreationInfo; 40 import org.finroc.core.port.cc.CCPortBase; 41 import org.finroc.core.port.cc.CCPortDataManager; 42 import org.finroc.core.port.cc.CCPortDataManagerTL; 43 import org.finroc.core.port.cc.CCPullRequestHandler; 44 import org.finroc.core.port.net.NetPort; 45 import org.finroc.core.port.rpc.RPCInterfaceType; 46 import org.finroc.core.port.rpc.internal.AbstractCall; 47 import org.finroc.core.port.rpc.internal.RPCMessage; 48 import org.finroc.core.port.rpc.internal.RPCPort; 49 import org.finroc.core.port.rpc.internal.RPCRequest; 50 import org.finroc.core.port.rpc.internal.RPCResponse; 51 import org.finroc.core.port.rpc.internal.AbstractCall.CallType; 52 import org.finroc.core.port.std.PortBase; 53 import org.finroc.core.port.std.PortDataManager; 54 import org.finroc.core.port.std.PullRequestHandler; 55 import org.finroc.core.portdatabase.CCType; 56 import org.finroc.core.portdatabase.FinrocTypeInfo; 57 import org.finroc.core.remote.ModelNode; 58 import org.finroc.core.remote.RemoteFrameworkElement; 59 import org.finroc.core.remote.RemotePort; 60 import org.finroc.core.remote.RemoteRuntime; 61 import org.finroc.core.remote.RemoteTypes; 62 import org.finroc.plugins.tcp.internal.TCP.OpCode; 28 import org.finroc.core.net.generic_protocol.Connection; 29 import org.finroc.core.net.generic_protocol.RemoteRuntime; 30 import org.finroc.core.remote.BufferedModelChanges; 31 import org.finroc.core.remote.Definitions; 32 import org.finroc.core.remote.ModelHandler; 63 33 import org.rrlib.logging.Log; 64 34 import org.rrlib.logging.LogLevel; 65 35 import org.rrlib.serialization.BinaryInputStream; 66 import org.rrlib.serialization.MemoryBuffer;67 import org.rrlib.serialization.Serialization;68 import org.rrlib.serialization.compression.Compressible;69 import org.rrlib.serialization.rtti.DataTypeBase;70 36 71 37 /** … … 75 41 * It creates a proxy port for each shared port in the remote runtime. 76 42 */ 77 public class RemotePart extends FrameworkElement implements PullRequestHandler, CCPullRequestHandler{43 public class RemotePart extends RemoteRuntime { 78 44 79 45 /** Peer info that this part belongs to */ … … 83 49 final TCPPeer peerImplementation; 84 50 85 /** Connection to transfer "express" ports data */86 TCPConnection expressConnection;87 88 /** Connection to transfer "bulk" ports data */89 TCPConnection bulkConnection;90 91 /** Connection to transfer all other "management" data */92 TCPConnection managementConnection;93 94 /** Structure information to send to remote part */95 private FrameworkElementInfo.StructureExchange sendStructureInfo = FrameworkElementInfo.StructureExchange.NONE;96 97 /** Framework element that contains all global links - possibly NULL */98 private FrameworkElement globalLinks;99 100 /** Framework element that contains all server ports - possibly NULL */101 private FrameworkElement serverPorts;102 103 /**104 * Lookup for remote framework elements (currently not ports) - similar to remote CoreRegister105 * (should only be accessed by reader thread of management connection)106 */107 private HashMap<Integer, ProxyPort> remotePortRegister = new HashMap<Integer, ProxyPort>();108 109 /** Temporary buffer for match checks (only used by bulk reader or connector thread) */110 private final StringBuilder tmpMatchBuffer = new StringBuilder();111 112 /** Administration interface client port */113 private final AdminClient adminInterface;114 115 /** Remote part's current model node */116 private RemoteRuntime currentModelNode;117 118 /**119 * Remote part's new model node. Is created on new connection.120 * CurrentModelNode is replaced with this, as soon as connection is fully initialized.121 * As this is not published yet, we can operate on this with any thread.122 */123 private RemoteRuntime newModelNode;124 125 /** List with remote ports that have not been initialized yet */126 private ArrayList<ProxyPort> uninitializedRemotePorts = new ArrayList<ProxyPort>();127 128 /** Next call id to assign to sent call */129 AtomicLong nextCallId = new AtomicLong();130 131 /** Pull calls that wait for a response */132 private ArrayList<PullCall> pullCallsAwaitingResponse = new ArrayList<PullCall>();133 134 51 /** Number of times disconnect was called, since last connect */ 135 52 private final AtomicInteger disconnectCalls = new AtomicInteger(0); 136 53 137 /** Has remote part compression support? */138 private boolean hasCompressionSupport = false;139 140 54 141 55 /** Peer info this part is associated with */ 142 RemotePart(PeerInfo peerInfo, FrameworkElement parent, TCPPeer peerImplementation ) {143 super(parent, peerInfo.toString() );56 RemotePart(PeerInfo peerInfo, FrameworkElement parent, TCPPeer peerImplementation, int partnerHandleStampWidth) { 57 super(parent, peerInfo.toString(), peerImplementation.structureExchange == Definitions.StructureExchange.FINSTRUCT, partnerHandleStampWidth); 144 58 this.peerInfo = peerInfo; 145 59 this.peerImplementation = peerImplementation; 146 adminInterface = (peerImplementation.structureExchange == FrameworkElementInfo.StructureExchange.FINSTRUCT) ?147 new AdminClient("AdminClient " + getName(), peerImplementation.connectionElement) : null;148 60 } 149 61 150 /** 151 * Add connection for this remote part 152 * 153 * @param connection Connection to add (with flags set) 154 * @return Did this succeed? (fails if there already is a connection for specified types of data; may happen if two parts try to connect at the same time - only one connection is kept) 155 */ 156 boolean addConnection(TCPConnection connection) { 157 if (connection.bulk) { 158 if (bulkConnection != null) { 159 return false; 160 } 161 bulkConnection = connection; 162 } else { 163 if (expressConnection != null || managementConnection != null) { 164 return false; 165 } 166 expressConnection = connection; 167 managementConnection = connection; 168 } 169 170 if (bulkConnection != null && expressConnection != null && managementConnection != null) { 171 peerInfo.connected = true; 172 Log.log(LogLevel.DEBUG, this, "Connected to " + peerInfo.toString()); 173 } 174 175 return true; 62 /** Whether there is currently a connection to this peer */ 63 public boolean isConnected() { 64 return getPrimaryConnection() != null && (!((TCPConnection)getPrimaryConnection()).isDisconnected()); 176 65 } 177 66 178 /** 179 * Called during initial structure exchange 180 * 181 * @param info Info on another remote framework element 182 * @param initalStructure Is this call originating from initial structure exchange? 183 * @param remoteRuntime Remote runtime object to add structure to 184 */ 185 void addRemoteStructure(FrameworkElementInfo info, boolean initalStructureExchange) { 186 RemoteRuntime remoteRuntime = initalStructureExchange ? newModelNode : currentModelNode; 187 long startTime = 0; 188 while (remoteRuntime == null && (!initalStructureExchange)) { 189 remoteRuntime = currentModelNode; 190 Log.log(LogLevel.DEBUG, this, "Waiting for remote model to become ready"); 191 if (startTime == 0) { 192 startTime = System.currentTimeMillis(); 193 } else if (System.currentTimeMillis() - startTime > 1000) { // We should not block thread forever (as this blocks GUI interaction in e.g. finstruct) 194 throw new RuntimeException("No model node available"); 195 } 196 try { 197 Thread.sleep(20); 198 } catch (InterruptedException e) {} 199 } 200 Log.log(LogLevel.DEBUG_VERBOSE_1, this, "Adding element: " + info.toString()); 201 if (info.isPort()) { 202 if ((!hasCompressionSupport) && info.getDataType().getName().equals("finroc.data_compression.CompressionRules")) { 203 hasCompressionSupport = true; 204 } 205 ProxyPort port = new ProxyPort(info); 206 for (int i = 0; i < info.getLinkCount(); i++) { 207 RemoteFrameworkElement remoteElement = new RemotePort(info.getHandle(), info.getLink(i).name, port.getPort(), i); 208 if (i == 0) { 209 remoteRuntime.elementLookup.put(info.getHandle(), remoteElement); 210 } 211 remoteElement.setName(info.getLink(i).name); 212 remoteElement.setTags(info.getTags()); 213 remoteElement.setFlags(info.getFlags()); 214 ModelNode parent = getFrameworkElement(info.getLink(i).parent, remoteRuntime); 215 if (initalStructureExchange) { 216 parent.add(remoteElement); 217 } else { 218 peerImplementation.connectionElement.getModelHandler().addNode(parent, remoteElement); 219 if (peerImplementation.structureExchange == FrameworkElementInfo.StructureExchange.SHARED_PORTS) { 220 port.getPort().init(); 221 } else { 222 uninitializedRemotePorts.add(port); 223 } 224 } 225 } 226 } else { 227 RemoteFrameworkElement remoteElement = getFrameworkElement(info.getHandle(), remoteRuntime); 228 remoteElement.setName(info.getLink(0).name); 229 remoteElement.setTags(info.getTags()); 230 remoteElement.setFlags(info.getFlags()); 231 ModelNode parent = getFrameworkElement(info.getLink(0).parent, remoteRuntime); 232 if (initalStructureExchange) { 233 parent.add(remoteElement); 234 } else { 235 peerImplementation.connectionElement.getModelHandler().addNode(parent, remoteElement); 236 } 237 } 238 } 239 240 /** 241 * Creates new model of remote part 242 */ 243 void createNewModel() { 244 newModelNode = new RemoteRuntime(peerInfo.toString(), peerInfo.uuid.toString(), adminInterface, managementConnection.remoteTypes); 67 @Override 68 public void createNewModel() { 69 newModelNode = new org.finroc.core.remote.RemoteRuntime(peerInfo.toString(), peerInfo.uuid.toString(), getAdminInterface(), getPrimaryConnection().getReadBufferStream(), handleStampWidth); 245 70 newModelNode.setFlags(RuntimeEnvironment.getInstance().getAllFlags()); 246 71 } 247 72 248 /** 249 * Creates qualified link for element of remote framework element model 250 * 251 * @param remoteElement Element to create link for 252 * @return Created Link 253 */ 254 private String createPortName(RemoteFrameworkElement remoteElement) { 255 return remoteElement.getQualifiedLink(); 256 } 257 258 /** 259 * Deletes all child elements of remote part 260 */ 261 void deleteAllChildren() { 262 ChildIterator ci = new ChildIterator(this); 263 FrameworkElement child; 264 while ((child = ci.next()) != null) { 265 child.managedDelete(); 266 } 267 remotePortRegister = new HashMap<Integer, ProxyPort>(); 268 } 269 270 /** 271 * Disconnects remote part 272 */ 73 @Override 273 74 public void disconnect() { 274 75 // make sure that disconnect is only called once... prevents deadlocks cleaning up all the threads … … 279 80 280 81 synchronized (peerImplementation.connectTo) { 281 if ( managementConnection!= null) {282 managementConnection.disconnect();82 if (getPrimaryConnection() != null) { 83 getPrimaryConnection().disconnect(); 283 84 } 284 if (expressConnection != null) { 285 expressConnection.disconnect(); 286 } 287 if (bulkConnection != null) { 288 bulkConnection.disconnect(); 85 if (getExpressConnection() != null) { 86 getExpressConnection().disconnect(); 289 87 } 290 88 disconnectCalls.set(0); … … 292 90 } 293 91 294 /** 295 * @param dataRate Data Rate 296 * @return Formatted Data Rate 297 */ 298 private static String formatRate(int dataRate) { 299 if (dataRate < 1000) { 300 return "" + dataRate; 301 } else if (dataRate < 10000000) { 302 return (dataRate / 1000) + "k"; 303 } else { 304 return (dataRate / 1000000) + "M"; 305 } 306 } 307 308 /** 309 * @return Connection quality (see ExternalConnection) 310 */ 311 public float getConnectionQuality() { 312 if (bulkConnection == null || expressConnection == null || managementConnection == null || (!peerInfo.connected)) { 313 return 0; 314 } 315 float pingTime = 0; 316 for (int i = 0; i < 3; i++) { 317 TCPConnection c = (i == 0) ? managementConnection : (i == 1 ? bulkConnection : expressConnection); 318 if (c != null) { 319 if (c.pingTimeExceeed()) { 320 return 0; 321 } 322 pingTime = Math.max(pingTime, (float)c.getAvgPingTime()); 323 } 324 } 325 if (pingTime < 300) { 326 return 1; 327 } else if (pingTime > 1300) { 328 return 0; 329 } else { 330 return ((float)pingTime - 300.0f) / 1000.0f; 331 } 332 } 333 334 /** 335 * Returns framework element with specified handle. 336 * Creates one if it doesn't exist. 337 * 338 * @param handle Remote handle of framework element 339 * @param remoteRuntime Remote runtime model to use for lookup 340 * @return Framework element. 341 */ 342 public RemoteFrameworkElement getFrameworkElement(int handle, RemoteRuntime remoteRuntime) { 343 RemoteFrameworkElement remoteElement = remoteRuntime.elementLookup.get(handle); 344 if (remoteElement == null) { 345 remoteElement = new RemoteFrameworkElement(handle, "(unknown)"); 346 remoteRuntime.elementLookup.put(handle, remoteElement); 347 } 348 return remoteElement; 349 } 350 351 /** 352 * @return Framework element that contains all global links (possibly created by call to this) 353 */ 354 public FrameworkElement getGlobalLinkElement() { 355 if (globalLinks == null) { 356 globalLinks = new FrameworkElement(this, "global", Flag.NETWORK_ELEMENT | Flag.GLOBALLY_UNIQUE_LINK | Flag.ALTERNATIVE_LINK_ROOT, -1); 357 } 358 return globalLinks; 359 } 360 361 /** 362 * @return String containing ping times 363 */ 364 public String getPingString() { 365 if (!peerInfo.connected) { 366 return "disconnected"; 367 } 368 369 int pingAvg = 0; 370 int pingMax = 0; 371 int dataRate = 0; 372 String s = "ping (avg/max/Rx): "; 373 if (bulkConnection == null || expressConnection == null) { // should be disconnected... but maybe this is even safer 374 return s + "- "; 375 } 376 for (int i = 0; i < 3; i++) { 377 TCPConnection c = (i == 0) ? managementConnection : (i == 1 ? bulkConnection : expressConnection); 378 if (c != null) { 379 if (c.pingTimeExceeed()) { 380 return s + "- "; 381 } 382 pingAvg = Math.max(pingAvg, c.getAvgPingTime()); 383 pingMax = Math.max(pingMax, c.getMaxPingTime()); 384 dataRate += c.getRx(); 385 } 386 } 387 return s + pingAvg + "ms/" + pingMax + "ms/" + formatRate(dataRate); 388 } 389 390 /** 391 * Initializes part and checks for admin port to connect to 392 * 393 * @param obsoleteNode Model node that is now obsolete 394 */ 395 public void initAndCheckForAdminPort(final ModelNode obsoleteNode) { 396 397 if (peerImplementation.structureExchange != FrameworkElementInfo.StructureExchange.SHARED_PORTS) { 398 399 // expand port names 400 synchronized (getRegistryLock()) { 401 for (ProxyPort pp : remotePortRegister.values()) { 402 RemotePort[] remotePorts = RemotePort.get(pp.getPort()); 403 for (int i = 0; i < remotePorts.length; i++) { 404 pp.getPort().setName(createPortName(remotePorts[i]), i); 405 } 406 } 407 } 408 } 409 410 this.init(); 411 412 // connect to admin interface? 413 if (adminInterface != null) { 414 FrameworkElement fe = getChildElement(AdministrationService.QUALIFIED_PORT_NAME, false); 415 if (fe != null && fe.isPort() && fe.isReady()) { 416 ((AbstractPort)fe).connectTo(adminInterface.getWrapped()); 417 } else { 418 Log.log(LogLevel.ERROR, this, "Could not find administration port to connect to."); 419 } 420 } 421 422 // set remote type in RemoteRuntime Annotation 423 //((RemoteRuntime)getAnnotation(RemoteRuntime.class)).setRemoteTypes(managementConnection.updateTimes); 424 final RemoteRuntime oldModel = currentModelNode; 425 final RemoteRuntime newModel = newModelNode; 426 currentModelNode = newModelNode; 427 newModelNode = null; 428 429 if (oldModel != null) { 430 peerImplementation.connectionElement.getModelHandler().removeNode(obsoleteNode); 431 peerImplementation.connectionElement.getModelHandler().replaceNode(oldModel, newModel); 432 } else { 433 peerImplementation.connectionElement.getModelHandler().replaceNode(obsoleteNode, newModel); 434 } 435 } 436 437 /** 438 * Process message with specified opcode in provided stream 439 * 440 * @param opCode Opcode of message 441 * @param stream Stream to read message from 442 * @param remoteTypes Info on remote types 443 * @param connection Connection this was called from 444 * @param elementInfoBuffer Framework element info buffer to use 445 */ 446 public void processMessage(OpCode opCode, BinaryInputStream stream, RemoteTypes remoteTypes, TCPConnection connection) throws Exception { 447 Log.log(LogLevel.DEBUG_VERBOSE_1, this, "Processing message " + opCode.toString()); 448 449 switch (opCode) { 450 case PORT_VALUE_CHANGE: 451 case SMALL_PORT_VALUE_CHANGE: 452 case SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP: 453 int handle = stream.readInt(); 454 Serialization.DataEncoding encoding = stream.readEnum(Serialization.DataEncoding.class); 455 AbstractPort port = RuntimeEnvironment.getInstance().getPort(handle); 456 if (port != null && port.isReady() && (!FinrocTypeInfo.isMethodType(port.getDataType(), true))) { 457 NetPort netPort = port.asNetPort(); 458 if (netPort != null) { 459 netPort.receiveDataFromStream(stream, encoding, opCode != OpCode.SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP); 460 } 461 } 462 break; 463 case RPC_CALL: 464 handle = stream.readInt(); 465 CallType callType = stream.readEnum(CallType.class); 466 DataTypeBase type = stream.readType(); 467 byte methodId = stream.readByte(); 468 if (!(type instanceof RPCInterfaceType)) { 469 Log.log(LogLevel.WARNING, this, "Type " + type.getName() + " is no RPC type. Ignoring call."); 470 return; 471 } 472 RPCInterfaceType rpcInterfaceType = (RPCInterfaceType)type; 473 474 if (callType == CallType.RPC_MESSAGE || callType == CallType.RPC_REQUEST) { 475 port = RuntimeEnvironment.getInstance().getPort(handle); 476 if (port != null && rpcInterfaceType == port.getDataType()) { 477 //RPCDeserializationScope deserializationScope(message.Get<0>(), connection.rpcCallBufferPools); // TODO? 478 if (callType == CallType.RPC_MESSAGE) { 479 RPCMessage.deserializeAndExecuteCallImplementation(stream, (RPCPort)port, methodId); 480 } else { 481 RPCRequest.deserializeAndExecuteCallImplementation(stream, (RPCPort)port, methodId, connection); 482 } 483 } 484 } else { // type is RPC response 485 long callId = stream.readLong(); 486 487 AbstractCall callAwaitingThisResponse = null; 488 synchronized (connection.callsAwaitingResponse) { 489 for (TCPConnection.CallAndTimeout call : connection.callsAwaitingResponse) { 490 if (call.call.getCallId() == callId) { 491 callAwaitingThisResponse = call.call; 492 connection.callsAwaitingResponse.remove(call); 493 break; 494 } 495 } 496 } 497 if (callAwaitingThisResponse == null) { // look in other connection; TODO: think about rules which connection to use for RPCs 498 TCPConnection otherConnection = (connection != expressConnection) ? expressConnection : bulkConnection; 499 synchronized (otherConnection.callsAwaitingResponse) { 500 for (TCPConnection.CallAndTimeout call : otherConnection.callsAwaitingResponse) { 501 if (call.call.getCallId() == callId) { 502 callAwaitingThisResponse = call.call; 503 otherConnection.callsAwaitingResponse.remove(call); 504 break; 505 } 506 } 507 } 508 } 509 if (callAwaitingThisResponse != null) { 510 port = RuntimeEnvironment.getInstance().getPort(callAwaitingThisResponse.getLocalPortHandle()); 511 if (port != null) { 512 //RPCDeserializationScope deserializationScope(callAwaitingThisResponse.getLocalPortHandle(), connection.rpcCallBufferPools); // TODO? 513 RPCResponse.deserializeAndExecuteCallImplementation(stream, rpcInterfaceType.getMethod(methodId), connection, callAwaitingThisResponse); 514 return; 515 } 516 } 517 RPCResponse.deserializeAndExecuteCallImplementation(stream, rpcInterfaceType.getMethod(methodId), connection, null); 518 } 519 break; 520 case PULLCALL: 521 handle = stream.readInt(); 522 long callUid = stream.readLong(); 523 encoding = stream.readEnum(Serialization.DataEncoding.class); 524 525 SerializedTCPCommand pullCallReturn = new SerializedTCPCommand(TCP.OpCode.PULLCALL_RETURN, 8192); 526 pullCallReturn.getWriteStream().writeLong(callUid); 527 port = RuntimeEnvironment.getInstance().getPort(handle); 528 if (port != null && port.isReady() && (!FinrocTypeInfo.isMethodType(port.getDataType(), true))) { 529 pullCallReturn.getWriteStream().writeBoolean(false); 530 if (FinrocTypeInfo.isStdType(port.getDataType())) { 531 CCPortBase pb = (CCPortBase)port; 532 CCPortDataManager manager = pb.getPullInInterthreadContainerRaw(true, true); 533 pullCallReturn.getWriteStream().writeBoolean(true); 534 pullCallReturn.getWriteStream().writeType(manager.getObject().getType()); 535 manager.getTimestamp().serialize(pullCallReturn.getWriteStream()); 536 manager.getObject().serialize(pullCallReturn.getWriteStream(), encoding); 537 manager.recycle2(); 538 } else { 539 PortBase pb = (PortBase)port; 540 PortDataManager manager = pb.getPullLockedUnsafe(true, true); 541 pullCallReturn.getWriteStream().writeBoolean(true); 542 pullCallReturn.getWriteStream().writeType(manager.getType()); 543 manager.getTimestamp().serialize(pullCallReturn.getWriteStream()); 544 manager.getObject().serialize(pullCallReturn.getWriteStream(), encoding); 545 manager.releaseLock(); 546 } 547 } else { 548 pullCallReturn.getWriteStream().writeBoolean(false); 549 } 550 connection.sendCall(pullCallReturn); 551 break; 552 case PULLCALL_RETURN: 553 callUid = stream.readLong(); 554 boolean failed = stream.readBoolean(); 555 556 synchronized (pullCallsAwaitingResponse) { 557 PullCall pullCall = null; 558 for (PullCall pullCallWaiting : pullCallsAwaitingResponse) { 559 if (pullCallWaiting.callId == callUid) { 560 pullCall = pullCallWaiting; 561 break; 562 } 563 } 564 if (pullCall != null) { 565 synchronized (pullCall) { 566 if (!failed) { 567 type = stream.readType(); 568 if (pullCall.origin != null) { 569 if (pullCall.origin.getDataType() == type) { 570 PortDataManager manager = pullCall.origin.getUnusedBufferRaw(); 571 manager.getTimestamp().deserialize(stream); 572 manager.getObject().deserialize(stream, pullCall.encoding); 573 } 574 } else if (pullCall.ccResultBuffer != null) { 575 if (pullCall.ccResultBuffer.getObject().getType() == type) { 576 pullCall.ccResultBuffer.getTimestamp().deserialize(stream); 577 pullCall.ccResultBuffer.getObject().deserialize(stream, pullCall.encoding); 578 } 579 } 580 } 581 pullCall.notify(); 582 } 583 } 584 } 585 break; 586 case TYPE_UPDATE: 587 type = stream.readType(); 588 remoteTypes.setTime(type, stream.readShort()); 589 break; 590 591 // case SUBSCRIBE: 592 // // TODO 593 // SubscribeMessage message; 594 // message.deserialize(stream); 595 // 596 // // Get or create server port 597 // auto it = serverPortMap.find(message.Get<0>()); 598 // if (it != serverPortMap.end()) { 599 // dataPorts::GenericPort port = it.second; 600 // NetworkPortInfo* networkPortInfo = port.GetAnnotation<NetworkPortInfo>(); 601 // networkPortInfo.setServerSideSubscriptionData(message.Get<1>(), message.Get<2>(), message.Get<3>(), message.Get<5>()); 602 // 603 // boolean pushStrategy = message.Get<1>() > 0; 604 // boolean reversePushStrategy = message.Get<2>(); 605 // if (port.getWrapped().pushStrategy() != pushStrategy || port.getWrapped().reversePushStrategy() != reversePushStrategy) { 606 // // flags need to be changed 607 // thread::Lock lock(getStructureMutex(), false); 608 // if (lock.tryLock()) { 609 // if (port.getWrapped().pushStrategy() != pushStrategy) { 610 // port.getWrapped().setPushStrategy(pushStrategy); 611 // } 612 // if (port.getWrapped().reversePushStrategy() != reversePushStrategy) { 613 // port.getWrapped().setReversePushStrategy(reversePushStrategy); 614 // } 615 // } 616 // else { 617 // return true; // We could not obtain lock - try again later 618 // } 619 // } 620 // } 621 // else { 622 // thread::Lock lock(getStructureMutex(), false); 623 // if (lock.tryLock()) { 624 // // Create server port 625 // AbstractPort* port = RuntimeEnvironment::getInstance().getPort(message.Get<0>()); 626 // if ((!port) || (!port.isReady())) { 627 // fINROC_LOG_PRINT(DEBUG_WARNING, "Port for subscription not available"); 628 // return false; 629 // } 630 // 631 // Flags flags = Flag::NETWORK_ELEMENT | Flag::VOLATILE; 632 // if (port.isOutputPort()) { 633 // flags |= Flag::ACCEPTS_DATA; // create input port 634 // } 635 // else { 636 // flags |= Flag::OUTPUT_PORT | Flag::EMITS_DATA; // create output io port 637 // } 638 // if (sendStructureInfo != StructureExchange::SHARED_PORTS) { 639 // flags |= Flag::TOOL_PORT; 640 // } 641 // if (message.Get<1>() > 0) { 642 // flags |= Flag::PUSH_STRATEGY; 643 // } 644 // if (message.Get<2>()) { 645 // flags |= Flag::PUSH_STRATEGY_REVERSE; 646 // } 647 // 648 // dataPorts::GenericPort createdPort(port.getQualifiedName().substr(1), getServerPortsElement(), port.getDataType(), flags, message.Get<3>()); 649 // NetworkPortInfo* networkPortInfo = new NetworkPortInfo(*this, message.Get<4>(), message.Get<1>(), true, *createdPort.getWrapped()); 650 // networkPortInfo.setServerSideSubscriptionData(message.Get<1>(), message.Get<2>(), message.Get<3>(), message.Get<5>()); 651 // createdPort.addPortListenerForPointer(*networkPortInfo); 652 // createdPort.init(); 653 // createdPort.connectTo(port); 654 // serverPortMap.insert(pair<FrameworkElementHandle, dataPorts::GenericPort>(message.Get<0>(), createdPort)); 655 // fINROC_LOG_PRINT(DEBUG, "Created server port ", createdPort.getWrapped().getQualifiedName()); 656 // } 657 // else { 658 // return true; // We could not obtain lock - try again later 659 // } 660 // } 661 // break; 662 // case UNSUBSCRIBE: 663 // // TODO 664 // UnsubscribeMessage message; 665 // message.deserialize(stream); 666 // auto it = serverPortMap.find(message.Get<0>()); 667 // if (it != serverPortMap.end()) { 668 // thread::Lock lock(getStructureMutex(), false); 669 // if (lock.tryLock()) { 670 // it.second.getWrapped().managedDelete(); 671 // serverPortMap.erase(message.Get<0>()); 672 // } 673 // else { 674 // return true; // We could not obtain lock - try again later 675 // } 676 // } 677 // else { 678 // fINROC_LOG_PRINT(DEBUG_WARNING, "Port for unsubscribing not available"); 679 // return false; 680 // } 681 // break; 682 683 case PEER_INFO: 684 PeerInfo info; 92 @Override 93 public void processMessage(TCP.OpCode opCode, BinaryInputStream stream, Connection connection, BufferedModelChanges modelChanges) throws Exception { 94 if (opCode == TCP.OpCode.PEER_INFO) { 685 95 while (stream.readBoolean()) { 686 96 PeerInfo peer = peerImplementation.deserializePeerInfo(stream); … … 688 98 peerImplementation.processIncomingPeerInfo(peer); 689 99 } 690 break; 691 default: 692 throw new Exception("Opcode " + opCode.toString() + " not implemented yet."); 693 } 694 } 695 696 /** 697 * Processes buffer containing change events regarding remote runtime. 698 * Must be executed from model thread. 699 * 700 * @param structureBufferToProcess Buffer containing serialized structure changes 701 * @param remoteTypes Info on remote types 702 */ 703 public void processStructurePacket(MemoryBuffer structureBufferToProcess, RemoteTypes remoteTypes) { 704 FrameworkElementInfo info = new FrameworkElementInfo(); 705 BinaryInputStream stream = new BinaryInputStream(structureBufferToProcess, remoteTypes); 706 while (stream.moreDataAvailable()) { 707 TCP.OpCode opcode = stream.readEnum(TCP.OpCode.class); 708 if (opcode == TCP.OpCode.STRUCTURE_CREATE) { 709 info.deserialize(stream, peerImplementation.structureExchange); 710 addRemoteStructure(info, false); 711 } else if (opcode == TCP.OpCode.STRUCTURE_CHANGE) { 712 int handle = stream.readInt(); 713 int flags = stream.readInt(); 714 short strategy = stream.readShort(); 715 short updateInterval = stream.readShort(); 716 ProxyPort port = remotePortRegister.get(handle); 717 if (port != null) { 718 boolean connections = peerImplementation.structureExchange == FrameworkElementInfo.StructureExchange.FINSTRUCT; 719 if (connections) { 720 info.deserializeConnections(stream); 721 } 722 port.update(flags, strategy, updateInterval, connections ? info.copyConnections() : null, connections ? info.copyNetworkConnections() : null); 723 724 RemotePort[] modelElements = RemotePort.get(port.getPort()); 725 for (RemotePort modelElement : modelElements) { 726 modelElement.setFlags(flags); 727 } 728 } else { 729 RemoteFrameworkElement element = currentModelNode.getRemoteElement(handle); 730 if (element != null) { 731 element.setFlags(flags); 732 } 733 } 734 } else if (opcode == TCP.OpCode.STRUCTURE_DELETE) { 735 int handle = stream.readInt(); 736 ProxyPort port = remotePortRegister.get(handle); 737 if (port != null) { 738 RemotePort[] modelElements = RemotePort.get(port.getPort()); 739 port.managedDelete(); 740 for (RemotePort modelElement : modelElements) { 741 peerImplementation.connectionElement.getModelHandler().removeNode(modelElement); 742 } 743 currentModelNode.elementLookup.remove(handle); 744 uninitializedRemotePorts.remove(port); 745 } else { 746 RemoteFrameworkElement element = currentModelNode.getRemoteElement(handle); 747 if (element != null) { 748 peerImplementation.connectionElement.getModelHandler().removeNode(element); 749 currentModelNode.elementLookup.remove(handle); 750 } 751 } 752 } else { 753 Log.log(LogLevel.WARNING, this, "Received corrupted structure info. Skipping packet"); 754 return; 755 } 756 TCPConnection.checkCommandEnd(stream); 757 } 758 759 // Initialize ports whose links are now complete 760 synchronized (getRegistryLock()) { 761 for (int i = 0; i < uninitializedRemotePorts.size(); i++) { 762 ProxyPort port = uninitializedRemotePorts.get(i); 763 RemotePort[] remotePorts = RemotePort.get(port.getPort()); 764 boolean complete = true; 765 for (RemotePort remotePort : remotePorts) { 766 complete |= remotePort.isNodeAncestor(currentModelNode); 767 } 768 if (complete) { 769 for (int j = 0; j < remotePorts.length; j++) { 770 port.getPort().setName(createPortName(remotePorts[j]), j); 771 } 772 port.getPort().init(); 773 uninitializedRemotePorts.remove(i); 774 i--; 775 } 776 } 100 } else { 101 super.processMessage(opCode, stream, connection, modelChanges); 777 102 } 778 103 } 779 104 780 105 @Override 781 public boolean pullRequest(CCPortBase origin, CCPortDataManagerTL resultBuffer, boolean intermediateAssign) { 782 NetPort netport = origin.asNetPort(); 783 if (netport != null && expressConnection != null) { 784 PullCall pullCall = new PullCall(netport); 785 pullCall.ccResultBuffer = resultBuffer; 786 synchronized (pullCallsAwaitingResponse) { 787 pullCallsAwaitingResponse.add(pullCall); 788 } 789 synchronized (pullCall) { 790 expressConnection.sendCall(pullCall); 791 try { 792 pullCall.wait(1000); 793 } catch (InterruptedException e) {} 794 } 795 synchronized (pullCallsAwaitingResponse) { 796 pullCallsAwaitingResponse.remove(pullCall); 797 } 798 if (pullCall.ccPullSuccess) { 799 return true; 800 } 801 } 802 origin.getRaw(resultBuffer.getObject(), true); 803 return true; 106 public void removeConnection(Connection connection) { 107 super.removeConnection(connection); 108 peerInfo.lastConnection = System.currentTimeMillis(); 804 109 } 805 110 806 111 @Override 807 public PortDataManager pullRequest(PortBase origin, byte addLocks, boolean intermediateAssign) { 808 NetPort netport = origin.asNetPort(); 809 if (netport != null && expressConnection != null) { 810 PullCall pullCall = new PullCall(netport); 811 pullCall.origin = origin; 812 synchronized (pullCallsAwaitingResponse) { 813 pullCallsAwaitingResponse.add(pullCall); 814 } 815 synchronized (pullCall) { 816 expressConnection.sendCall(pullCall); 817 try { 818 pullCall.wait(1000); 819 } catch (InterruptedException e) {} 820 } 821 synchronized (pullCallsAwaitingResponse) { 822 pullCallsAwaitingResponse.remove(pullCall); 823 } 824 if (pullCall.resultBuffer != null) { 825 pullCall.resultBuffer.getCurrentRefCounter().setLocks((byte)(addLocks)); 826 return pullCall.resultBuffer; 827 } 828 } 829 PortDataManager pd = origin.lockCurrentValueForRead(); 830 pd.getCurrentRefCounter().addLocks((byte)(addLocks - 1)); // we already have one lock 831 return pd; 832 } 833 834 /** 835 * Removes connection for this remote part 836 * 837 * @param connection Connection to remove 838 */ 839 void removeConnection(TCPConnection connection) { 840 if (connection == managementConnection) { 841 managementConnection = null; 842 deleteAllChildren(); 843 globalLinks = null; 844 serverPorts = null; 845 uninitializedRemotePorts.clear(); 846 pullCallsAwaitingResponse.clear(); 847 if (currentModelNode != null) { 848 peerImplementation.connectionElement.getModelHandler().removeNode(currentModelNode); 849 } 850 currentModelNode = null; 851 } 852 if (connection == expressConnection) { 853 expressConnection = null; 854 } 855 if (connection == bulkConnection) { 856 bulkConnection = null; 857 } 858 if (peerInfo.connected) { 859 peerInfo.lastConnection = System.currentTimeMillis(); 860 } 861 peerInfo.connected = false; 862 } 863 864 /** 865 * @param sendStructureInfo Structure information to send to remote part 866 */ 867 void setDesiredStructureInfo(FrameworkElementInfo.StructureExchange sendStructureInfo) { 868 if (sendStructureInfo == FrameworkElementInfo.StructureExchange.NONE) { 869 return; 870 } 871 if (this.sendStructureInfo != FrameworkElementInfo.StructureExchange.NONE && this.sendStructureInfo != sendStructureInfo) { 872 Log.log(LogLevel.WARNING, this, "Desired structure info already set to " + this.sendStructureInfo.toString() + ". This is likely to cause trouble."); 873 } 874 this.sendStructureInfo = sendStructureInfo; 875 } 876 877 /** 878 * Local port that acts as proxy for ports on remote machines 879 */ 880 public class ProxyPort extends TCPPort { 881 882 /** Has port been found again after reconnect? */ 883 private boolean refound = true; 884 885 /** >= 0 when port has subscribed to server; value of current subscription */ 886 private short subscriptionStrategy = -1; 887 888 /** true, if current subscription includes reverse push strategy */ 889 private boolean subscriptionRevPush = false; 890 891 /** Update time of current subscription */ 892 private short subscriptionUpdateTime = -1; 893 894 /** Handles (remote) of port's outgoing connections */ 895 protected ArrayList<FrameworkElementInfo.ConnectionInfo> connections = new ArrayList<FrameworkElementInfo.ConnectionInfo>(); 896 897 /** Info on port's outgoing network connections */ 898 protected ArrayList<FrameworkElementInfo.NetworkConnection> networkConnections = new ArrayList<FrameworkElementInfo.NetworkConnection>(); 899 900 901 /** 902 * @param portInfo Port information 903 */ 904 public ProxyPort(FrameworkElementInfo portInfo) { 905 super(createPCI(portInfo), null /*(portInfo.getFlags() & Flag.EXPRESS_PORT) > 0 ? expressConnection : bulkConnection // bulkConnection might be null*/); 906 remoteHandle = portInfo.getHandle(); 907 remotePortRegister.put(remoteHandle, this); 908 909 super.updateFlags(portInfo.getFlags()); 910 getPort().setMinNetUpdateInterval(portInfo.getMinNetUpdateInterval()); 911 updateIntervalPartner = portInfo.getMinNetUpdateInterval(); // TODO redundant? 912 propagateStrategyFromTheNet(portInfo.getStrategy()); 913 connections = portInfo.copyConnections(); 914 networkConnections = portInfo.copyNetworkConnections(); 915 916 Log.log(LogLevel.DEBUG_VERBOSE_2, this, "Updating port info: " + portInfo.toString()); 917 for (int i = 1, n = portInfo.getLinkCount(); i < n; i++) { 918 FrameworkElement parent = portInfo.getLink(i).unique ? getGlobalLinkElement() : (FrameworkElement)RemotePart.this; 919 getPort().link(parent, portInfo.getLink(i).name); 920 } 921 FrameworkElement parent = portInfo.getLink(0).unique ? getGlobalLinkElement() : (FrameworkElement)RemotePart.this; 922 getPort().setName(portInfo.getLink(0).name); 923 parent.addChild(getPort()); 924 FrameworkElementTags.addTags(getPort(), portInfo.getTags()); 925 926 if (getPort() instanceof CCPortBase) { 927 ((CCPortBase)getPort()).setPullRequestHandler(RemotePart.this); 928 } else if (getPort() instanceof PortBase) { 929 ((PortBase)getPort()).setPullRequestHandler(RemotePart.this); 930 } 931 } 932 933 public void update(int flags, short strategy, short minNetUpdateInterval, ArrayList<FrameworkElementInfo.ConnectionInfo> newConnections, 934 ArrayList<FrameworkElementInfo.NetworkConnection> newNetworkConnections) { 935 updateFlags(flags); 936 getPort().setMinNetUpdateInterval(minNetUpdateInterval); 937 updateIntervalPartner = minNetUpdateInterval; // TODO redundant? 938 propagateStrategyFromTheNet(strategy); 939 if (newConnections != null) { 940 connections = newConnections; 941 } else if (connections.size() > 0) { 942 connections = new ArrayList<FrameworkElementInfo.ConnectionInfo>(); // create new list for thread-safety reasons 943 } 944 if (newNetworkConnections != null) { 945 networkConnections = newNetworkConnections; 946 } else if (connections.size() > 0) { 947 networkConnections = new ArrayList<FrameworkElementInfo.NetworkConnection>(); // create new list for thread-safety reasons 948 } 949 } 950 951 // /** 952 // * Is port the one that is described by this information? 953 // * 954 // * @param info Port information 955 // * @return Answer 956 // */ 957 // public boolean matches(FrameworkElementInfo info) { 958 // synchronized (getPort()) { 959 // if (remoteHandle != info.getHandle() || info.getLinkCount() != getPort().getLinkCount()) { 960 // return false; 961 // } 962 // if ((getPort().getAllFlags() & Flag.CONSTANT_FLAGS) != (info.getFlags() & Flag.CONSTANT_FLAGS)) { 963 // return false; 964 // } 965 // for (int i = 0; i < info.getLinkCount(); i++) { 966 // if (peerImplementation.structureExchange == FrameworkElementInfo.StructureExchange.SHARED_PORTS) { 967 // getPort().getQualifiedLink(tmpMatchBuffer, i); 968 // } else { 969 // tmpMatchBuffer.delete(0, tmpMatchBuffer.length()); 970 // tmpMatchBuffer.append(getPort().getLink(i).getName()); 971 // } 972 // if (!tmpMatchBuffer.equals(info.getLink(i).name)) { 973 // return false; 974 // } 975 // // parents are negligible if everything else, matches 976 // } 977 // return true; 978 // } 979 // } 980 981 // public void reset() { 982 // connection = null; // set connection to null 983 // monitored = false; // reset monitored flag 984 // refound = false; // reset refound flag 985 // propagateStrategyFromTheNet((short)0); 986 // subscriptionRevPush = false; 987 // subscriptionUpdateTime = -1; 988 // subscriptionStrategy = -1; 989 // } 990 991 // /** 992 // * Update port properties/information from received port information 993 // * 994 // * @param portInfo Port info 995 // */ 996 // private void updateFromPortInfo(FrameworkElementInfo portInfo, TCP.OpCode opCode) { 997 // synchronized (getPort().getRegistryLock()) { 998 // updateFlags(portInfo.getFlags()); 999 // getPort().setMinNetUpdateInterval(portInfo.getMinNetUpdateInterval()); 1000 // updateIntervalPartner = portInfo.getMinNetUpdateInterval(); // TODO redundant? 1001 // propagateStrategyFromTheNet(portInfo.getStrategy()); 1002 // portInfo.getConnections(connections); 1003 // 1004 // log(LogLevel.LL_DEBUG_VERBOSE_2, this, "Updating port info: " + portInfo.toString()); 1005 // if (opCode == TCP.OpCode.STRUCTURE_CREATE) { 1006 // assert(!getPort().isReady()); 1007 // for (int i = 1, n = portInfo.getLinkCount(); i < n; i++) { 1008 // FrameworkElement parent = portInfo.getLink(i).unique ? getGlobalLinkElement() : (FrameworkElement)RemotePart.this; 1009 // getPort().link(parent, portInfo.getLink(i).name); 1010 // } 1011 // FrameworkElement parent = portInfo.getLink(0).unique ? getGlobalLinkElement() : (FrameworkElement)RemotePart.this; 1012 // getPort().setName(portInfo.getLink(0).name); 1013 // parent.addChild(getPort()); 1014 // } 1015 // FrameworkElementTags.addTags(getPort(), portInfo.getTags()); 1016 // 1017 // checkSubscription(); 1018 // } 1019 // } 1020 1021 @Override 1022 protected void prepareDelete() { 1023 remotePortRegister.remove(remoteHandle); 1024 getPort().disconnectAll(); 1025 checkSubscription(); 1026 super.prepareDelete(); 1027 } 1028 1029 @Override 1030 protected void connectionRemoved() { 1031 checkSubscription(); 1032 } 1033 1034 @Override 1035 protected void connectionAdded() { 1036 checkSubscription(); 1037 } 1038 1039 @Override 1040 protected void propagateStrategyOverTheNet() { 1041 checkSubscription(); 1042 } 1043 1044 @Override 1045 protected void checkSubscription() { 1046 if (FinrocTypeInfo.isMethodType(getPort().getDataType(), true)) { 1047 return; 1048 } 1049 1050 synchronized (getPort().getRegistryLock()) { 1051 AbstractPort p = getPort(); 1052 boolean revPush = p.isInputPort() && (p.isConnectedToReversePushSources() || p.getOutgoingConnectionCount() > 0); 1053 short time = getUpdateIntervalForNet(); 1054 short strategy = p.isInputPort() ? 0 : p.getStrategy(); 1055 if (!p.isConnected()) { 1056 strategy = -1; 1057 } 1058 1059 TCPConnection c = connection; 1060 1061 if (c == null) { 1062 subscriptionStrategy = -1; 1063 subscriptionRevPush = false; 1064 subscriptionUpdateTime = -1; 1065 } else if (strategy == -1 && subscriptionStrategy > -1) { // disconnect 1066 //System.out.println("Unsubscribing " + (((long)remoteHandle) + (1L << 32L)) + " " + getPort().getQualifiedName()); 1067 c.unsubscribe(remoteHandle); 1068 subscriptionStrategy = -1; 1069 subscriptionRevPush = false; 1070 subscriptionUpdateTime = -1; 1071 } else if (strategy == -1) { 1072 // still disconnected 1073 } else if (strategy != subscriptionStrategy || time != subscriptionUpdateTime || revPush != subscriptionRevPush) { 1074 boolean requestCompressedData = hasCompressionSupport && getNetworkEncoding() == Serialization.DataEncoding.BINARY && (!peerInfo.uuid.hostName.equals(peerImplementation.thisPeer.uuid)) && 1075 isStdType() && getPort().getDataType().getJavaClass() != null && Compressible.class.isAssignableFrom(getPort().getDataType().getJavaClass()); 1076 1077 c.subscribe(remoteHandle, strategy, revPush, time, p.getHandle(), requestCompressedData ? Serialization.DataEncoding.BINARY_COMPRESSED : getNetworkEncoding()); 1078 //c.subscribe(remoteHandle, strategy, revPush, time, p.getHandle(), getNetworkEncoding()); 1079 subscriptionStrategy = strategy; 1080 subscriptionRevPush = revPush; 1081 subscriptionUpdateTime = time; 1082 } 1083 setMonitored(publishPortDataOverTheNet() && getPort().isConnected()); 1084 } 1085 } 1086 1087 @Override 1088 public int getRemoteEdgeDestinations(List<AbstractPort> result) { 1089 result.clear(); 1090 for (int i = 0; i < connections.size(); i++) { 1091 ProxyPort pp = remotePortRegister.get(connections.get(i).handle); 1092 if (pp != null) { 1093 result.add(pp.getPort()); 1094 } 1095 } 1096 int numberOfReverseConnections = 0; 1097 if (getExtraEdgeProvider() != null) { 1098 numberOfReverseConnections = getExtraEdgeProvider().getRemoteEdgeDestinations(result); 1099 } 1100 for (FrameworkElementInfo.NetworkConnection connection : networkConnections) { 1101 AbstractPort port = connection.getDestinationPort(currentModelNode); 1102 if (port != null) { 1103 if (connection.isDestinationSource()) { 1104 numberOfReverseConnections++; 1105 result.add(port); 1106 } else { 1107 result.add(result.size() - numberOfReverseConnections, port); 1108 } 1109 } 1110 } 1111 return result.size() - numberOfReverseConnections; 1112 } 1113 1114 @Override 1115 protected void postChildInit() { 1116 this.connection = (getPort().getFlag(FrameworkElementFlags.EXPRESS_PORT) || 1117 (getPort().getDataType().getJavaClass() != null && CCType.class.isAssignableFrom(getPort().getDataType().getJavaClass()))) ? expressConnection : bulkConnection; 1118 super.postChildInit(); 1119 } 1120 } 1121 1122 /** 1123 * (Belongs to ProxyPort) 1124 * 1125 * Create Port Creation info from PortInfo class. 1126 * Except from Shared flag port will be identical to original port. 1127 * 1128 * @param portInfo Port Information 1129 * @return Port Creation info 1130 */ 1131 private static PortCreationInfo createPCI(FrameworkElementInfo portInfo) { 1132 PortCreationInfo pci = new PortCreationInfo(portInfo.getFlags()); 1133 pci.flags = portInfo.getFlags(); 1134 1135 // set queue size 1136 pci.maxQueueSize = portInfo.getStrategy(); 1137 1138 pci.dataType = portInfo.getDataType(); 1139 pci.lockOrder = LockOrderLevels.REMOTE_PORT; 1140 1141 return pci; 1142 } 1143 1144 /** 1145 * Pull call storage and management 1146 */ 1147 class PullCall extends SerializedTCPCommand { 1148 1149 PullCall(NetPort netport) { 1150 super(TCP.OpCode.PULLCALL, 16); 1151 timeSent = System.currentTimeMillis(); 1152 callId = nextCallId.incrementAndGet(); 1153 encoding = netport.getNetworkEncoding(); 1154 getWriteStream().writeInt(netport.getRemoteHandle()); 1155 getWriteStream().writeLong(callId); 1156 getWriteStream().writeEnum(encoding); 1157 } 1158 1159 /** Time when call was created/sent */ 1160 final long timeSent; 1161 1162 /** Call id of pull call */ 1163 final long callId; 1164 1165 /** Port call originates from - in case of a standard port */ 1166 PortBase origin; 1167 1168 /** Buffer with result */ 1169 PortDataManager resultBuffer; 1170 1171 /** Result buffer for CC port */ 1172 CCPortDataManagerTL ccResultBuffer; 1173 1174 /** Was CC Pull successful? */ 1175 boolean ccPullSuccess; 1176 1177 /** Data encoding to use */ 1178 Serialization.DataEncoding encoding; 112 public ModelHandler getModelHandler() { 113 return peerImplementation.connectionElement.getModelHandler(); 1179 114 } 1180 115 } -
internal/TCP.java
r95 r114 29 29 import org.finroc.core.RuntimeEnvironment; 30 30 import org.finroc.core.RuntimeSettings; 31 import org.finroc.core. datatype.FrameworkElementInfo;31 import org.finroc.core.net.generic_protocol.Definitions; 32 32 import org.finroc.core.parameter.ConstructorParameters; 33 33 import org.finroc.core.parameter.StaticParameterList; … … 45 45 * Plugin for P2P TCP connections 46 46 */ 47 public class TCP implements Plugin {47 public class TCP extends Definitions implements Plugin { 48 48 49 49 /** Singleton instance of TCP plugin */ … … 58 58 59 59 /** 60 * Protocol OpCodes60 * Flags for connection properties 61 61 */ 62 public enum OpCode { 63 64 // Opcodes for management connection 65 SUBSCRIBE, // Subscribe to data port 66 UNSUBSCRIBE, // Unsubscribe from data port 67 PULLCALL, // Pull call 68 PULLCALL_RETURN, // Returning pull call 69 RPC_CALL, // RPC call 70 TYPE_UPDATE, // Update on remote type info (typically desired update time) 71 STRUCTURE_CREATE, // Update on remote framework elements: Element created 72 STRUCTURE_CHANGE, // Update on remote framework elements: Port changed 73 STRUCTURE_DELETE, // Update on remote framework elements: Element deleted 74 PEER_INFO, // Information about other peers 75 76 // Change event opcodes (from subscription - or for plain setting of port) 77 PORT_VALUE_CHANGE, // normal variant 78 SMALL_PORT_VALUE_CHANGE, // variant with max. 256 byte message length (3 bytes smaller than normal variant) 79 SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP, // variant with max. 256 byte message length and no timestamp (11 bytes smaller than normal variant) 80 81 // Used for messages without opcode 82 OTHER 83 } 84 85 public enum MessageSize { 86 FIXED, // fixed size message 87 VARIABLE_UP_TO_255_BYTE, // variable message size up to 255 bytes 88 VARIABLE_UP_TO_4GB // variable message size up to 4GB 89 }; 62 public static final int 63 PRIMARY_CONNECTION = 0x1, //<! This connection is used to transfer management data (e.g. available ports, peer info, subscriptions, rpc calls) 64 EXPRESS_DATA = 0x2, //<! This connection transfers "express data" (port values of express ports) - candidate for UDP in the future 65 BULK_DATA = 0x4, //<! This connection transfers "bulk data" (port values of bulk ports) - candidate for UDP in the future 66 JAVA_PEER = 0x8, //<! Sent by Java Peers on connection initialization 67 NO_DEBUG = 0x10; //<! No debug information in protocol 90 68 91 69 /** Mode of peer */ … … 95 73 FULL // Peer is client and server 96 74 } 97 98 /**99 * Flags for connection properties100 */101 public static final int102 MANAGEMENT_DATA = 0x1, //<! This connection is used to transfer management data (e.g. available ports, peer info, subscriptions, rpc calls)103 EXPRESS_DATA = 0x2, //<! This connection transfers "express data" (port values of express ports) - candidate for UDP in the future104 BULK_DATA = 0x4; //<! This connection transfers "bulk data" (port values of bulk ports) - candidate for UDP in the future105 106 /** Message size encodings of different kinds of op codes */107 public static final MessageSize[] MESSAGE_SIZES = new MessageSize[] {108 MessageSize.FIXED, // SUBSCRIBE109 MessageSize.FIXED, // UNSUBSCRIBE110 MessageSize.FIXED, // PULLCALL111 MessageSize.VARIABLE_UP_TO_4GB, // PULLCALL_RETURN112 MessageSize.VARIABLE_UP_TO_4GB, // RPC_CALL113 MessageSize.VARIABLE_UP_TO_4GB, // UPDATE_TIME114 MessageSize.VARIABLE_UP_TO_4GB, // STRUCTURE_CREATE115 MessageSize.VARIABLE_UP_TO_4GB, // STRUCTURE_CHANGE116 MessageSize.FIXED, // STRUCTURE_DELETE117 MessageSize.VARIABLE_UP_TO_4GB, // PEER_INFO118 MessageSize.VARIABLE_UP_TO_4GB, // PORT_VALUE_CHANGE119 MessageSize.VARIABLE_UP_TO_255_BYTE, // SMALL_PORT_VALUE_CHANGE120 MessageSize.VARIABLE_UP_TO_255_BYTE, // SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP121 MessageSize.VARIABLE_UP_TO_4GB, // OTHER122 };123 75 124 76 /** Return Status */ … … 131 83 public static final String GREET_MESSAGE = "Greetings! I am a Finroc TCP peer."; 132 84 133 /** Finroc TCP protocol version */134 public static final short PROTOCOL_VERSION = 1;135 136 /** Inserted at the end of messages to debug TCP stream */137 public static final byte DEBUG_TCP_NUMBER = (byte)0xCD;138 139 85 /** Standard TCP connection creator */ 140 public static final CreateAction creator1 = new CreateAction( FrameworkElementInfo.StructureExchange.COMPLETE_STRUCTURE, "TCP", 0);86 public static final CreateAction creator1 = new CreateAction(Definitions.StructureExchange.COMPLETE_STRUCTURE, "TCP", 0); 141 87 142 88 /** Alternative TCP connection creator */ 143 public static final CreateAction creator2 = new CreateAction( FrameworkElementInfo.StructureExchange.SHARED_PORTS, TCP_PORTS_ONLY_NAME, 0);89 public static final CreateAction creator2 = new CreateAction(Definitions.StructureExchange.SHARED_PORTS, TCP_PORTS_ONLY_NAME, 0); 144 90 145 91 /** Complete TCP connection creator */ 146 public static final CreateAction creator3 = new CreateAction( FrameworkElementInfo.StructureExchange.FINSTRUCT, "TCP finstruct", CreateExternalConnectionAction.REMOTE_EDGE_INFO);92 public static final CreateAction creator3 = new CreateAction(Definitions.StructureExchange.FINSTRUCT, "TCP finstruct", CreateExternalConnectionAction.REMOTE_EDGE_INFO); 147 93 148 94 … … 198 144 199 145 /** Desired structure exchange */ 200 private final FrameworkElementInfo.StructureExchange structureExchange;146 private final Definitions.StructureExchange structureExchange; 201 147 202 148 /** Name of connection type */ … … 209 155 private final String group; 210 156 211 public CreateAction( FrameworkElementInfo.StructureExchange structureExchange, String name, int flags) {157 public CreateAction(Definitions.StructureExchange structureExchange, String name, int flags) { 212 158 this.structureExchange = structureExchange; 213 159 this.name = name; -
internal/TCPConnection.java
r110 r114 35 35 import org.rrlib.finroc_core_utils.jc.AtomicDoubleInt; 36 36 import org.rrlib.finroc_core_utils.jc.HasDestructor; 37 import org.rrlib.finroc_core_utils.jc.MutexLockOrder;38 import org.rrlib.finroc_core_utils.jc.Time;39 import org.rrlib.finroc_core_utils.jc.container.SafeConcurrentlyIterableList;40 37 import org.rrlib.finroc_core_utils.jc.container.WonderQueue; 41 38 import org.rrlib.logging.Log; … … 45 42 import org.rrlib.serialization.InputStreamSource; 46 43 import org.rrlib.serialization.MemoryBuffer; 47 import org.rrlib.serialization.Serialization ;44 import org.rrlib.serialization.SerializationInfo; 48 45 import org.rrlib.serialization.rtti.DataTypeBase; 49 46 50 import org.finroc.core.LockOrderLevels;51 import org.finroc.core.RuntimeEnvironment;52 import org.finroc.core.RuntimeSettings;53 47 import org.finroc.core.datatype.CoreString; 54 import org.finroc.core.datatype.FrameworkElementInfo; 55 import org.finroc.core.parameter.ParameterNumeric; 48 import org.finroc.core.net.generic_protocol.Connection; 49 import org.finroc.core.net.generic_protocol.Definitions; 50 import org.finroc.core.net.generic_protocol.RemoteProxyPort; 51 import org.finroc.core.net.generic_protocol.SerializedMessage; 56 52 import org.finroc.core.port.AbstractPort; 57 53 import org.finroc.core.port.ThreadLocalCache; 58 import org.finroc.core.port.net.UpdateTimeChangeListener;59 54 import org.finroc.core.port.rpc.FutureStatus; 60 55 import org.finroc.core.port.rpc.internal.AbstractCall; 61 import org.finroc.core.port.rpc.internal.ResponseSender; 56 import org.finroc.core.remote.BufferedModelChanges; 57 import org.finroc.core.remote.ModelHandler; 62 58 import org.finroc.core.remote.ModelNode; 63 import org.finroc.core.remote.RemoteType s;59 import org.finroc.core.remote.RemoteType; 64 60 import org.finroc.core.thread.CoreLoopThreadBase; 65 61 import org.finroc.plugins.tcp.TCPSettings; 66 import org.finroc.plugins.tcp.internal.TCP.OpCode;67 62 68 63 /** … … 73 68 * (writer and listener members need to be initialized by subclass) 74 69 */ 75 class TCPConnection implements UpdateTimeChangeListener, ResponseSender{70 public class TCPConnection extends Connection { 76 71 77 72 /** Network Socket used for accessing remote Server */ 78 73 private final Socket socket; 79 74 80 /** default connection times of connection partner */81 final RemoteTypes remoteTypes = new RemoteTypes();82 83 75 /** Socket's output stream */ 84 76 private final OutputStream socketOutputStream; 85 77 86 /** Buffer for writing data to stream */87 private final MemoryBuffer writeBuffer = new MemoryBuffer(MemoryBuffer.DEFAULT_SIZE, MemoryBuffer.DEFAULT_RESIZE_FACTOR, false);88 89 /** Output Stream for sending data to remote Server */90 private final BinaryOutputStream writeBufferStream = new BinaryOutputStream(writeBuffer, remoteTypes);91 92 /** Input Stream for receiving data ro remote Server */93 private final BinaryInputStream readBufferStream;94 95 /** Reference to remote part this connection belongs to */96 private final RemotePart remotePart;97 98 78 /** Writer Thread */ 99 79 private Writer writer; … … 102 82 private Reader reader; 103 83 104 /** List with calls that wait for a response */105 final ArrayList<CallAndTimeout> callsAwaitingResponse = new ArrayList<CallAndTimeout>();106 107 /** References to Connection parameters */108 private ParameterNumeric<Integer> minUpdateInterval;109 private ParameterNumeric<Integer> maxNotAcknowledgedPackets;110 111 /** Index of last acknowledged sent packet */112 private volatile int lastAcknowledgedPacket = 0;113 114 /** Index of last acknowledgement request that was received */115 private volatile int lastAckRequestIndex = 0;116 117 /** Timestamp of when packet n was sent (Index is n % MAX_NOT_ACKNOWLEDGED_PACKETS => efficient and safe implementation (ring queue)) */118 private final long[] sentPacketTime = new long[TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS + 1];119 120 /** Ping time for last packages (Index is n % AVG_PING_PACKETS => efficient and safe implementation (ring queue)) */121 private final int[] pingTimes = new int[TCPSettings.AVG_PING_PACKETS + 1];122 123 /** Ping time statistics */124 private volatile int avgPingTime, maxPingTime;125 126 /** Signal for disconnecting */127 private volatile boolean disconnectSignal = false;128 129 /** Connection type - BULK or EXPRESS */130 protected final boolean bulk;131 132 /** Ports that are monitored for changes by this connection and should be checked for modifications */133 protected SafeConcurrentlyIterableList<TCPPort> monitoredPorts = new SafeConcurrentlyIterableList<TCPPort>(50, 4);134 135 84 /** TCPPeer that this connection belongs to (null if it does not belong to a peer) */ 136 85 protected final TCPPeer peer; … … 139 88 protected int lastPeerInfoSentRevision = -1; 140 89 141 /** Rx related: last time RX was retrieved */ 142 protected long lastRxTimestamp = 0; 143 144 /** Rx related: last time RX was retrieved: how much have we received in total? */ 145 protected long lastRxPosition = 0; 146 147 /** Needs to be locked after framework elements, but before runtime registry */ 148 public final MutexLockOrder objMutex = new MutexLockOrder(LockOrderLevels.REMOTE + 1); 149 150 /** Log description for connection */ 151 protected String description; 152 153 /** Temporary buffer with port information */ 154 private final FrameworkElementInfo tempFrameworkElementInfo = new FrameworkElementInfo(); 155 90 /** Whether connection has been disconnected - or is disconnected */ 91 private volatile boolean disconnected = false; 156 92 157 93 /** … … 169 105 this.socket = socket; 170 106 this.socketOutputStream = socket.getOutputStream(); 171 this.readBufferStream = new BinaryInputStream(new InputStreamSource(socket.getInputStream()), remoteTypes);107 this.readBufferStream = new BinaryInputStream(new InputStreamSource(socket.getInputStream()), INITIAL_SERIALIZATION_INFO); 172 108 173 109 // Initialize connection 174 110 // Write... 175 111 writeBufferStream.writeString(TCP.GREET_MESSAGE); 176 writeBufferStream.writeShort( TCP.PROTOCOL_VERSION);112 writeBufferStream.writeShort(Definitions.PROTOCOL_VERSION_MAJOR); 177 113 writeBufferStream.writeSkipOffsetPlaceholder(); 178 114 // ConnectionInitMessage … … 181 117 writeBufferStream.writeString(peer.thisPeer.name); 182 118 writeBufferStream.writeEnum(peer.structureExchange); 183 writeBufferStream.writeInt(( connectionFlags & TCP.BULK_DATA) != 0 ? TCP.BULK_DATA : (TCP.EXPRESS_DATA | TCP.MANAGEMENT_DATA));119 writeBufferStream.writeInt(((connectionFlags & TCP.PRIMARY_CONNECTION) != 0 ? (TCP.BULK_DATA | TCP.PRIMARY_CONNECTION) : TCP.EXPRESS_DATA) | TCP.JAVA_PEER | (Definitions.PROTOCOL_VERSION_MINOR << 16)); 184 120 TCP.serializeInetAddress(writeBufferStream, socket.getInetAddress()); 185 if (TCPSettings.DEBUG_TCP) { 186 writeBufferStream.writeByte(TCP.DEBUG_TCP_NUMBER); 187 } 121 writeBufferStream.writeByte(Definitions.DEBUG_TCP_NUMBER); 188 122 writeBufferStream.skipTargetHere(); 189 123 writeBufferToNetwork(); … … 197 131 throw new ConnectException("Partner does not speak Finroc protocol"); 198 132 } 199 if (readBufferStream.readShort() != TCP.PROTOCOL_VERSION) {133 if (readBufferStream.readShort() != Definitions.PROTOCOL_VERSION_MAJOR) { 200 134 Log.log(LogLevel.WARNING, this, "Connection partner has wrong protocol version"); 201 135 throw new ConnectException("Partner has wrong protocol version"); … … 206 140 TCP.PeerType peerType = readBufferStream.readEnum(TCP.PeerType.class); 207 141 String peerName = readBufferStream.readString(); 208 FrameworkElementInfo.StructureExchange structureExchange = readBufferStream.readEnum(FrameworkElementInfo.StructureExchange.class); // TODO: send structure 209 connectionFlags |= readBufferStream.readInt(); 210 this.bulk = (connectionFlags & TCP.BULK_DATA) != 0; 142 Definitions.StructureExchange structureExchange = readBufferStream.readEnum(Definitions.StructureExchange.class); // TODO: send structure 143 int flags = readBufferStream.readInt(); 144 int partnerFlags = flags & 0xFF; 145 int partnerSerializationRevision = flags >> 16; 146 boolean legacyPartnerRuntime = partnerSerializationRevision == 0; 147 int partnerHandleStampWidth = legacyPartnerRuntime ? 12 : ((flags >> 8) & 0xFF); 148 connectionFlags |= partnerFlags; 149 this.primary = (connectionFlags & TCP.PRIMARY_CONNECTION) != 0; 211 150 InetAddress myAddress = TCP.deserializeInetAddress(readBufferStream); 212 checkCommandEnd(readBufferStream); 151 boolean debugTcp = legacyPartnerRuntime || (!((connectionFlags & TCP.NO_DEBUG) != 0 || (flags & TCP.NO_DEBUG) != 0)); 152 checkCommandEnd(readBufferStream); // requires serialization info to bet set up 213 153 214 154 // Adjust peer management information 215 155 synchronized (peer.connectTo) { 216 156 peer.addOwnAddress(myAddress); 217 remote Part = peer.getRemotePart(uuid, peerType, peerName, socket.getInetAddress(), neverForget);218 remotePart.peerInfo.connecting = true;219 if (!remote Part.addConnection(this)) {157 remoteRuntime = peer.getRemotePart(uuid, peerType, peerName, socket.getInetAddress(), neverForget, partnerHandleStampWidth); 158 ((RemotePart)remoteRuntime).peerInfo.connecting = true; 159 if (!remoteRuntime.addConnection(this)) { 220 160 throw new ConnectException("Connection already established. Closing."); 221 161 } 222 if (peer.thisPeer.peerType != TCP.PeerType.CLIENT_ONLY) { 223 remotePart.setDesiredStructureInfo(structureExchange); 224 } 162 } 163 164 // Setup serialization info 165 int otherSerializationFlags = structureExchange.ordinal() | (debugTcp ? Definitions.INFO_FLAG_DEBUG_PROTOCOL : 0); 166 int otherDeserializationFlags = peer.structureExchange.ordinal() | Definitions.INFO_FLAG_JAVA_CLIENT | (debugTcp ? Definitions.INFO_FLAG_DEBUG_PROTOCOL : 0); 167 boolean javaPartner = (partnerFlags & TCP.JAVA_PEER) != 0; 168 boolean primaryConnection = (partnerFlags & TCP.PRIMARY_CONNECTION) != 0; 169 boolean expressOnlyConnection = (partnerFlags & TCP.EXPRESS_DATA) != 0 && (!primaryConnection) || (connectionFlags & TCP.EXPRESS_DATA) != 0; 170 if (legacyPartnerRuntime) { 171 SerializationInfo legacySerialization = new SerializationInfo(0, INITIAL_SERIALIZATION_INFO.getRegisterEntryEncodings(), otherSerializationFlags); 172 writeBufferStream.setSerializationTarget(legacySerialization); 173 SerializationInfo legacyDeserialization = new SerializationInfo(0, INITIAL_SERIALIZATION_INFO.getRegisterEntryEncodings(), otherDeserializationFlags); 174 readBufferStream.setSerializationSource(legacyDeserialization); 175 } else if (expressOnlyConnection) { 176 writeBufferStream.setSharedSerializationInfo(remoteRuntime.getPrimaryConnection().getWriteBufferStream()); 177 readBufferStream.setSharedSerializationInfo(remoteRuntime.getPrimaryConnection().getReadBufferStream()); 178 } else { 179 int revision = Math.min(Definitions.PROTOCOL_VERSION_MINOR, partnerSerializationRevision); 180 int writeEncoding = INITIAL_SERIALIZATION_INFO.getRegisterEntryEncodings(); 181 int javaEncoding = SerializationInfo.setRegisterEntryEncoding(SerializationInfo.setDefaultRegisterEntryEncoding(SerializationInfo.RegisterEntryEncoding.PUBLISH_REGISTER_ON_CHANGE, Definitions.RegisterUIDs.values().length), Definitions.RegisterUIDs.CREATE_ACTION.ordinal(), SerializationInfo.RegisterEntryEncoding.PUBLISH_REGISTER_ON_DEMAND); 182 if (javaPartner) { 183 writeEncoding = javaEncoding; 184 otherSerializationFlags |= Definitions.INFO_FLAG_JAVA_CLIENT; 185 } 186 writeBufferStream.setSerializationTarget(new SerializationInfo(revision, writeEncoding, otherSerializationFlags)); 187 readBufferStream.setSerializationSource(new SerializationInfo(revision, javaEncoding, otherDeserializationFlags)); 225 188 } 226 189 227 190 // set params 228 minUpdateInterval = bulk? TCPSettings.getInstance().minUpdateIntervalBulk : TCPSettings.getInstance().minUpdateIntervalExpress;229 maxNotAcknowledgedPackets = bulk? TCPSettings.getInstance().maxNotAcknowledgedPacketsBulk : TCPSettings.getInstance().maxNotAcknowledgedPacketsExpress;191 minUpdateInterval = primary ? TCPSettings.getInstance().minUpdateIntervalBulk : TCPSettings.getInstance().minUpdateIntervalExpress; 192 maxNotAcknowledgedPackets = primary ? TCPSettings.getInstance().maxNotAcknowledgedPacketsBulk : TCPSettings.getInstance().maxNotAcknowledgedPacketsExpress; 230 193 231 194 // Init connection 232 195 readBufferStream.setTimeout(-1); 233 String typeString = bulk ? "Bulk" : "Express";196 String typeString = primary ? "Primary" : "Express"; 234 197 235 198 // Initialize them here, so that subscribe calls from structure creation do not get lost … … 238 201 239 202 // Get framework elements from connection partner 240 if ((connectionFlags & TCP.MANAGEMENT_DATA) != 0) { 241 peer.connectionElement.getModelHandler().changeNodeName(modelNode, "Connecting to " + remotePart.peerInfo.toString() + "..."); 203 if ((connectionFlags & TCP.PRIMARY_CONNECTION) != 0) { 204 BufferedModelChanges change = new BufferedModelChanges(); 205 change.changeNodeName(modelNode, "Connecting to " + ((RemotePart)remoteRuntime).peerInfo.toString() + "..."); 206 getModelHandler().applyModelChanges(change); 207 208 242 209 //ModelNode statusNode = new ModelNode("Obtaining structure information..."); 243 210 //peer.connectionElement.getModelHandler().addNode(modelNode, statusNode); 244 211 245 212 // retrieveRemotePorts(cis, cos, updateTimes, newServer); 246 boolean newServer = true; /*(serverCreationTime < 0) || (serverCreationTime != timeBase);*/213 //boolean newServer = true; /*(serverCreationTime < 0) || (serverCreationTime != timeBase);*/ 247 214 //log(LogLevel.LL_DEBUG, this, (newServer ? "Connecting" : "Reconnecting") + " to server " + uuid.toString() + "..."); 248 215 249 // Delete any elements from previous connections 250 remotePart.deleteAllChildren();251 remote Part.createNewModel();216 // Delete any elements from previous connections (no longer necessary) 217 //remoteRuntime.deleteAllChildren(); 218 remoteRuntime.createNewModel(); 252 219 253 220 /*portIterator.reset(); … … 264 231 // Process structure packets 265 232 int structurePacketSize = readBufferStream.readInt(); 266 boolean readType = peer.structureExchange == FrameworkElementInfo.StructureExchange.SHARED_PORTS;233 boolean readType = peer.structureExchange == Definitions.StructureExchange.SHARED_PORTS; 267 234 MemoryBuffer structurePacketBuffer = new MemoryBuffer(structurePacketSize); 268 BinaryInputStream structurePacketReadStream = new BinaryInputStream(structurePacketBuffer, re moteTypes);235 BinaryInputStream structurePacketReadStream = new BinaryInputStream(structurePacketBuffer, readBufferStream); 269 236 while (structurePacketSize != 0) { 270 237 structurePacketBuffer.setSize(structurePacketSize); … … 272 239 structurePacketReadStream.reset(); 273 240 if (readType) { 274 DataTypeBase type = structurePacketReadStream.readType();241 DataTypeBase type = RemoteType.deserialize(structurePacketReadStream).getDefaultLocalDataType(); 275 242 if (type == null || type != CoreString.TYPE) { 276 243 Log.log(LogLevel.WARNING, this, "Type encoding does not seem to work"); … … 280 247 } 281 248 while (structurePacketReadStream.moreDataAvailable()) { 282 tempFrameworkElementInfo.deserialize(structurePacketReadStream, peer.structureExchange);283 remote Part.addRemoteStructure(tempFrameworkElementInfo, true);249 tempFrameworkElementInfo.deserialize(structurePacketReadStream, true); 250 remoteRuntime.addRemoteStructure(tempFrameworkElementInfo, true, null); 284 251 } 285 252 structurePacketSize = readBufferStream.readInt(); 286 253 } 287 // remote Part.initAndCheckForAdminPort(modelNode); do this later: after bulk connection has been initialized254 // remoteRuntime.initAndCheckForAdminPort(modelNode); do this later: after bulk connection has been initialized 288 255 } catch (Exception e) { 289 256 if (e.getCause() instanceof EOFException) { … … 293 260 } 294 261 //peer.connectionElement.getModelHandler().removeNode(statusNode); 295 remote Part.removeConnection(this);262 remoteRuntime.removeConnection(this); 296 263 throw e; 297 264 } … … 309 276 } 310 277 311 /** 312 * @return Is TCP connection disconnecting? 313 */ 314 public boolean disconnecting() { 315 return disconnectSignal; 316 } 317 318 /** 319 * @return Type of connection ("Bulk" oder "Express") 320 */ 321 public String getConnectionTypeString() { 322 return bulk ? "Bulk" : "Express"; 323 } 324 325 /** 326 * Close connection 327 */ 278 @Override 328 279 public synchronized void disconnect() { 329 280 disconnectSignal = true; 330 RuntimeSettings.getInstance().removeUpdateTimeChangeListener(this);331 281 synchronized (peer.connectTo) { 332 remote Part.removeConnection(this);282 remoteRuntime.removeConnection(this); 333 283 } 334 284 notifyWriter(); // stops writer … … 360 310 361 311 /** 362 * Check that command is terminated correctly when TCPSettings.DEBUG_TCP is activated363 */364 static void checkCommandEnd(BinaryInputStream stream) {365 if (TCPSettings.DEBUG_TCP) {366 int i = stream.readByte();367 if (i != TCP.DEBUG_TCP_NUMBER) {368 throw new RuntimeException("TCP Stream seems corrupt");369 }370 }371 }372 373 /**374 312 * Listens at socket for incoming data 375 313 */ … … 386 324 initThreadLocalCache(); 387 325 BinaryInputStream stream = TCPConnection.this.readBufferStream; 388 MemoryBuffer structureBuffer = new MemoryBuffer(); // structure changes are copied to this buffer and processed by model (thread) 389 BinaryOutputStream structureBufferWriter = new BinaryOutputStream(structureBuffer); 390 byte[] tempBuffer = new byte[2048]; 326 BufferedModelChanges modelChanges = new BufferedModelChanges(); 391 327 392 328 try { … … 410 346 while (lastAcknowledgedPacket != acknowledgement) { 411 347 lastAcknowledgedPacket = (lastAcknowledgedPacket + 1) & 0x7FFF; 412 pingTimes[lastAcknowledgedPacket & TCPSettings.AVG_PING_PACKETS] =413 (int)(curTime - sentPacketTime[lastAcknowledgedPacket & TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS]);348 pingTimes[lastAcknowledgedPacket & Definitions.AVG_PING_PACKETS] = 349 (int)(curTime - sentPacketTime[lastAcknowledgedPacket & Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS]); 414 350 } 415 351 updatePingStatistics(); … … 417 353 418 354 while (stream.getAbsoluteReadPosition() < nextPacketStart) { 419 TCP.OpCode opCode = stream.readEnum(TCP.OpCode.class);420 if (opCode.ordinal() >= TCP.OpCode.OTHER.ordinal()) {355 Definitions.OpCode opCode = stream.readEnum(Definitions.OpCode.class); 356 if (opCode.ordinal() >= Definitions.OpCode.OTHER.ordinal()) { 421 357 Log.log(LogLevel.WARNING, this, "Received corrupted TCP message batch. Invalid opcode. Skipping."); 422 358 stream.skip((int)(nextPacketStart - stream.getAbsoluteReadPosition())); 423 359 break; 424 360 } 425 TCP.MessageSize messageSizeEncoding = TCP.MESSAGE_SIZES[opCode.ordinal()];426 long messageSize = messageSizeEncoding == TCP.MessageSize.FIXED ? -1 :427 (messageSizeEncoding == TCP.MessageSize.VARIABLE_UP_TO_255_BYTE ? (stream.readByte() & 0xFF) : stream.readInt());361 Definitions.MessageSize messageSizeEncoding = Definitions.MESSAGE_SIZES[opCode.ordinal()]; 362 long messageSize = messageSizeEncoding == Definitions.MessageSize.FIXED ? -1 : 363 (messageSizeEncoding == Definitions.MessageSize.VARIABLE_UP_TO_255_BYTE ? (stream.readByte() & 0xFF) : stream.readInt()); 428 364 //long messageEncodingSize = messageSizeEncoding.ordinal() * messageSizeEncoding.ordinal(); // :-) 429 365 long commandStartPosition = stream.getAbsoluteReadPosition(); … … 436 372 437 373 // Copy to structure buffer? 438 if (opCode == OpCode.STRUCTURE_CREATE || opCode == OpCode.STRUCTURE_CHANGE || opCode == OpCode.STRUCTURE_DELETE) { 374 /*if (opCode == Definitions.OpCode.STRUCTURE_CREATED || opCode == Definitions.OpCode.STRUCTURE_CHANGED || opCode == Definitions.OpCode.STRUCTURE_DELETED) { 375 remoteRuntime.processStructure 376 // TODO because of register entries this needs to be read now 439 377 structureBufferWriter.writeEnum(opCode); 440 long remaining = (opCode == OpCode.STRUCTURE_DELETE) ? (TCPSettings.DEBUG_TCP ? 5 : 4) : messageSize;378 long remaining = (opCode == Definitions.OpCode.STRUCTURE_DELETED) ? (TCPSettings.DEBUG_TCP ? 5 : 4) : messageSize; 441 379 while (remaining > 0) { 442 380 long copy = Math.min(tempBuffer.length, remaining); … … 445 383 remaining -= copy; 446 384 } 447 } else { 448 try { 449 remotePart.processMessage(opCode, stream, remoteTypes, TCPConnection.this); 450 checkCommandEnd(stream); 451 } catch (Exception e) { 452 Log.log(LogLevel.WARNING, this, "Failed to deserialize message of type " + opCode.toString() + ". Skipping. Reason: ", e); 453 long skip = nextCommandStartPosition - stream.getAbsoluteReadPosition(); 385 } else {*/ 386 try { 387 remoteRuntime.processMessage(opCode, stream, TCPConnection.this, modelChanges); 388 checkCommandEnd(stream); 389 } catch (Exception e) { 390 Log.log(LogLevel.WARNING, this, "Failed to deserialize message of type " + opCode.toString() + ". Skipping. Reason: ", e); 391 long skip = nextCommandStartPosition - stream.getAbsoluteReadPosition(); 392 if (skip >= 0) { 393 stream.skip((int)skip); 394 } else { 395 skip = nextPacketStart - stream.getAbsoluteReadPosition(); 454 396 if (skip >= 0) { 455 stream.skip((int)skip);397 Log.log(LogLevel.WARNING, this, "Too much stream content was read. This is not handled yet. Moving to next message batch."); // TODO 456 398 } else { 457 skip = nextPacketStart - stream.getAbsoluteReadPosition(); 458 if (skip >= 0) { 459 Log.log(LogLevel.WARNING, this, "Too much stream content was read. This is not handled yet. Moving to next message batch."); // TODO 460 } else { 461 Log.log(LogLevel.WARNING, this, "Too much stream content was read - exceeding message batch. This is not handled yet. Disconnecting."); // TODO 462 } 399 Log.log(LogLevel.WARNING, this, "Too much stream content was read - exceeding message batch. This is not handled yet. Disconnecting."); // TODO 463 400 } 464 401 } 402 //} 465 403 } 466 404 } 467 405 406 if (!modelChanges.empty()) { 407 getModelHandler().applyModelChanges(modelChanges); 408 modelChanges = new BufferedModelChanges(); 409 } 410 411 /* 468 412 structureBufferWriter.flush(); 469 413 if (structureBuffer.getSize() > 0) { … … 475 419 @Override 476 420 public void run() { 477 remote Part.processStructurePacket(structureBufferToProcess, remoteTypes);421 remoteRuntime.processStructurePacket(structureBufferToProcess, remoteTypes); 478 422 } 479 423 }); 480 } 424 }*/ 481 425 } 482 426 } catch (Exception e) { 427 disconnected = true; 483 428 if (e instanceof RuntimeException && e.getCause() != null) { 484 429 e = (Exception)e.getCause(); … … 489 434 } 490 435 try { 491 remote Part.disconnect();436 remoteRuntime.disconnect(); 492 437 } catch (Exception e) { 493 438 Log.log(LogLevel.WARNING, this, e); … … 497 442 stream.close(); 498 443 } catch (Exception e) {} 444 disconnected = true; 499 445 } 500 446 … … 525 471 // } 526 472 527 /** 528 * Notify (possibly wake-up) writer thread. Should be called whenever new tasks for the writer arrive. 529 */ 473 @Override 530 474 public void notifyWriter() { 531 475 Writer lockedWriter = writer; 532 476 if (lockedWriter != null) { 533 477 lockedWriter.notifyWriter(); 534 }535 }536 537 /** Class to store a pair - call and timeout - in a list */538 static class CallAndTimeout {539 540 long timeoutTime;541 AbstractCall call;542 543 public CallAndTimeout(long timeoutTime, AbstractCall call) {544 this.timeoutTime = timeoutTime;545 this.call = call;546 478 } 547 479 } … … 574 506 575 507 /** Queue with TCP commands waiting to be sent */ 576 private final WonderQueue<Serialized TCPCommand> tcpCallsToSend = new WonderQueue<SerializedTCPCommand>();508 private final WonderQueue<SerializedMessage> tcpCallsToSend = new WonderQueue<SerializedMessage>(); 577 509 578 510 /** List with calls that were not ready for sending yet */ … … 610 542 public boolean canSend() { 611 543 int maxNotAck = maxNotAcknowledgedPackets.getValue(); 612 return curPacketIndex < lastAcknowledgedPacket + maxNotAck || (maxNotAck <= 0 && curPacketIndex < lastAcknowledgedPacket + TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS);544 return curPacketIndex < lastAcknowledgedPacket + maxNotAck || (maxNotAck <= 0 && curPacketIndex < lastAcknowledgedPacket + Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS); 613 545 } 614 546 … … 626 558 if (disconnectSignal) { 627 559 try { 628 remote Part.disconnect();560 remoteRuntime.disconnect(); 629 561 } catch (Exception e) { 630 562 Log.log(LogLevel.WARNING, this, e); … … 679 611 680 612 // send port data 681 ArrayWrapper< TCPPort> it = monitoredPorts.getIterable();613 ArrayWrapper<RemoteProxyPort> it = monitoredPorts.getIterable(); 682 614 byte changedFlag = 0; 683 615 for (int i = 0, n = it.size(); i < n; i++) { 684 TCPPort pp = it.get(i);616 RemoteProxyPort pp = it.get(i); 685 617 if (pp != null && pp.getPort().isReady()) { 686 618 if (pp.getLastUpdate() + pp.getUpdateIntervalForNet() > startTime) { … … 737 669 writeBuffer.getBuffer().putShort(4, curPacketIndex & 0x7FFF); // TODO 738 670 lastAcknowledgementRequestTime = System.currentTimeMillis(); 739 sentPacketTime[curPacketIndex & TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS] = lastAcknowledgementRequestTime;671 sentPacketTime[curPacketIndex & Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS] = lastAcknowledgementRequestTime; 740 672 } 741 673 … … 752 684 } 753 685 } catch (Exception e) { 754 686 disconnected = true; 755 687 if (e instanceof RuntimeException && e.getCause() != null) { 756 688 e = (Exception)e.getCause(); … … 761 693 762 694 try { 763 remote Part.disconnect();695 remoteRuntime.disconnect(); 764 696 } catch (Exception e2) { 765 697 Log.log(LogLevel.WARNING, this, e2); … … 770 702 stream.close(); 771 703 } catch (Exception e) {} 704 disconnected = true; 772 705 } 773 706 … … 846 779 } 847 780 848 Serialized TCPCommandtcpCall = null;781 SerializedMessage tcpCall = null; 849 782 while ((tcpCall = tcpCallsToSend.dequeue()) != null) { 850 783 tcpCall.serialize(stream); … … 874 807 boolean expectsResponse = call.expectsResponse(); 875 808 if (expectsResponse) { 876 call.setCallId(remote Part.nextCallId.incrementAndGet());877 } 878 879 writeBufferStream.writeEnum( TCP.OpCode.RPC_CALL);809 call.setCallId(remoteRuntime.getUniqueCallId()); 810 } 811 812 writeBufferStream.writeEnum(Definitions.OpCode.RPC_CALL); 880 813 writeBufferStream.writeSkipOffsetPlaceholder(); 881 814 writeBufferStream.writeInt(call.getRemotePortHandle()); … … 883 816 call.serialize(writeBufferStream); 884 817 if (TCPSettings.DEBUG_TCP) { 885 writeBufferStream.writeByte( TCP.DEBUG_TCP_NUMBER);818 writeBufferStream.writeByte(Definitions.DEBUG_TCP_NUMBER); 886 819 } 887 820 writeBufferStream.skipTargetHere(); … … 943 876 * @param call Call object 944 877 */ 945 public void sendCall(Serialized TCPCommandcall) {878 public void sendCall(SerializedMessage call) { 946 879 //call.responsibleThread = -1; 947 880 tcpCallsToSend.enqueue(call); … … 949 882 } 950 883 } 951 952 /**953 * Checks whether any waiting calls have timed out.954 * Removes any timed out calls from list.955 *956 * @return Are still calls waiting?957 */958 public boolean checkWaitingCallsForTimeout(long timeNow) {959 synchronized (callsAwaitingResponse) {960 for (int i = 0; i < callsAwaitingResponse.size(); i++) {961 if (timeNow > callsAwaitingResponse.get(i).timeoutTime) {962 callsAwaitingResponse.get(i).call.setException(FutureStatus.TIMEOUT);963 callsAwaitingResponse.remove(i);964 i--;965 }966 }967 return callsAwaitingResponse.size() > 0;968 }969 }970 971 /**972 * Should be called regularly by monitoring thread to check whether critical973 * ping time threshold is exceeded.974 *975 * @return Time the calling thread may wait before calling again (it futile to call this method before)976 */977 public long checkPingForDisconnect() {978 Writer lockedWriter = writer;979 if (lockedWriter == null) {980 return TCPSettings.getInstance().criticalPingThreshold.getValue();981 }982 if (lastAcknowledgedPacket != lockedWriter.curPacketIndex) {983 long criticalPacketTime = sentPacketTime[(lastAcknowledgedPacket + 1) & TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS];984 long timeLeft = criticalPacketTime + TCPSettings.getInstance().criticalPingThreshold.getValue() - System.currentTimeMillis();985 if (timeLeft < 0) {986 handlePingTimeExceed();987 return TCPSettings.getInstance().criticalPingThreshold.getValue();988 }989 return timeLeft;990 } else {991 return TCPSettings.getInstance().criticalPingThreshold.getValue();992 }993 }994 995 /**996 * Updates ping statistic variables997 */998 private void updatePingStatistics() {999 int result = 0;1000 int resultAvg = 0;1001 for (int i = 0; i < pingTimes.length; i++) {1002 result = Math.max(result, pingTimes[i]);1003 resultAvg += pingTimes[i];1004 }1005 maxPingTime = result;1006 avgPingTime = resultAvg / pingTimes.length;1007 }1008 1009 /**1010 * @return Maximum ping time among last TCPSettings.AVG_PING_PACKETS packets1011 */1012 public int getMaxPingTime() {1013 return maxPingTime;1014 }1015 1016 /**1017 * @return Average ping time among last TCPSettings.AVG_PING_PACKETS packets1018 */1019 public int getAvgPingTime() {1020 return avgPingTime;1021 }1022 1023 /**1024 * @return Is critical ping time currently exceeded (possibly temporary disconnect)1025 */1026 public boolean pingTimeExceeed() {1027 Writer lockedWriter = writer;1028 if (lockedWriter == null) {1029 return false;1030 }1031 if (lastAcknowledgedPacket != lockedWriter.curPacketIndex) {1032 long criticalPacketTime = sentPacketTime[(lastAcknowledgedPacket + 1) & TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS];1033 long timeLeft = criticalPacketTime + TCPSettings.getInstance().criticalPingThreshold.getValue() - System.currentTimeMillis();1034 return timeLeft < 0;1035 } else {1036 return false;1037 }1038 }1039 1040 /**1041 * Called when critical ping time threshold was exceeded1042 */1043 public void handlePingTimeExceed() {} // TODO1044 884 1045 885 // /** … … 1092 932 // } 1093 933 1094 /** 1095 * Send call to connection partner 1096 * 1097 * @param call Call object 1098 */ 934 @Override 1099 935 public void sendCall(AbstractCall call) { 1100 936 Writer lockedWriter = writer; … … 1106 942 } 1107 943 1108 /** 1109 * Send serialized TCP call to connection partner 1110 * 1111 * @param call Call object 1112 */ 1113 public void sendCall(SerializedTCPCommand call) { 944 @Override 945 public void sendCall(SerializedMessage call) { 1114 946 Writer lockedWriter = writer; 1115 947 if (lockedWriter != null && (!disconnectSignal)) { … … 1118 950 } 1119 951 952 /** 953 * @return Is critical ping time currently exceeded (possibly temporary disconnect) 954 */ 1120 955 @Override 1121 public void updateTimeChanged(DataTypeBase dt, short newUpdateTime) { 1122 // forward update time change to connection partner 1123 SerializedTCPCommand command = new SerializedTCPCommand(TCP.OpCode.TYPE_UPDATE, 8); 1124 command.getWriteStream().writeShort(dt == null ? -1 : dt.getUid()); 1125 command.getWriteStream().writeShort(newUpdateTime); 1126 sendCall(command); 956 public boolean pingTimeExceeed() { 957 Writer lockedWriter = writer; 958 if (lockedWriter == null) { 959 return false; 960 } 961 if (lastAcknowledgedPacket != lockedWriter.curPacketIndex) { 962 long criticalPacketTime = sentPacketTime[(lastAcknowledgedPacket + 1) & Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS]; 963 long timeLeft = criticalPacketTime + TCPSettings.getInstance().criticalPingThreshold.getValue() - System.currentTimeMillis(); 964 return timeLeft < 0; 965 } else { 966 return false; 967 } 968 } 969 970 /** 971 * Should be called regularly by monitoring thread to check whether critical 972 * ping time threshold is exceeded. 973 * 974 * @return Time the calling thread may wait before calling again (it futile to call this method before) 975 */ 976 public long checkPingForDisconnect() { 977 Writer lockedWriter = writer; 978 if (lockedWriter == null) { 979 return TCPSettings.getInstance().criticalPingThreshold.getValue(); 980 } 981 if (lastAcknowledgedPacket != lockedWriter.curPacketIndex) { 982 long criticalPacketTime = sentPacketTime[(lastAcknowledgedPacket + 1) & Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS]; 983 long timeLeft = criticalPacketTime + TCPSettings.getInstance().criticalPingThreshold.getValue() - System.currentTimeMillis(); 984 if (timeLeft < 0) { 985 handlePingTimeExceed(); 986 return TCPSettings.getInstance().criticalPingThreshold.getValue(); 987 } 988 return timeLeft; 989 } else { 990 return TCPSettings.getInstance().criticalPingThreshold.getValue(); 991 } 992 } 993 994 /** 995 * Writes all contents of write buffer to network and empties and resets it 996 */ 997 protected void writeBufferToNetwork() throws IOException { 998 writeBufferStream.flush(); 999 socketOutputStream.write(writeBuffer.getBuffer().getBuffer().array(), 0, writeBuffer.getSize()); 1000 writeBufferStream.reset(); 1001 } 1002 1003 @Override 1004 public ModelHandler getModelHandler() { 1005 return peer.connectionElement.getModelHandler(); 1006 } 1007 1008 /** 1009 * @return Whether connection has been disconnected - or is disconnected 1010 */ 1011 public boolean isDisconnected() { 1012 return disconnected; 1127 1013 } 1128 1014 … … 1240 1126 // } 1241 1127 1242 /**1243 * @return Data rate of bytes read from network (in bytes/s)1244 */1245 public int getRx() {1246 long lastTime = lastRxTimestamp;1247 long lastPos = lastRxPosition;1248 lastRxTimestamp = Time.getCoarse();1249 lastRxPosition = readBufferStream.getAbsoluteReadPosition();1250 if (lastTime == 0) {1251 return 0;1252 }1253 if (lastRxTimestamp == lastTime) {1254 return 0;1255 }1256 1257 double data = lastRxPosition - lastPos;1258 double interval = (lastRxTimestamp - lastTime) / 1000;1259 return (int)(data / interval);1260 }1261 1262 public String toString() {1263 return description;1264 }1265 1266 /**1267 * Writes all contents of write buffer to network and empties and resets it1268 */1269 protected void writeBufferToNetwork() throws IOException {1270 writeBufferStream.flush();1271 socketOutputStream.write(writeBuffer.getBuffer().getBuffer().array(), 0, writeBuffer.getSize());1272 writeBufferStream.reset();1273 }1274 1275 /**1276 * Subscribe to port changes on remote server1277 *1278 * @param index Port index in remote runtime1279 * @param strategy Strategy to use/request1280 * @param updateInterval Minimum interval in ms between notifications (values <= 0 mean: use server defaults)1281 * @param localIndex Local Port Index1282 * @param dataType DataType got from server1283 * @param enc Data type to use when writing data to the network1284 */1285 public void subscribe(int index, short strategy, boolean reversePush, short updateInterval, int localIndex, Serialization.DataEncoding enc) {1286 SerializedTCPCommand command = new SerializedTCPCommand(TCP.OpCode.SUBSCRIBE, 16);1287 command.getWriteStream().writeInt(index);1288 command.getWriteStream().writeShort(strategy);1289 command.getWriteStream().writeBoolean(reversePush);1290 command.getWriteStream().writeShort(updateInterval);1291 command.getWriteStream().writeInt(localIndex);1292 command.getWriteStream().writeEnum(enc);1293 sendCall(command);1294 }1295 1296 /**1297 * Unsubscribe from port changes on remote server1298 *1299 * @param index Port index in remote runtime1300 */1301 public void unsubscribe(int index) {1302 SerializedTCPCommand command = new SerializedTCPCommand(TCP.OpCode.UNSUBSCRIBE, 8);1303 command.getWriteStream().writeInt(index);1304 sendCall(command);1305 }1306 1307 /**1308 * @param portIndex Index of port1309 * @return TCPPort for specified index1310 */1311 protected TCPPort lookupPortForCallHandling(int portIndex) {1312 AbstractPort ap = RuntimeEnvironment.getInstance().getPort(portIndex);1313 TCPPort p = null;1314 if (ap != null) {1315 p = (TCPPort)ap.asNetPort();1316 assert(p != null);1317 }1318 return p;1319 }1320 1321 @Override1322 public void sendResponse(AbstractCall responseToSend) {1323 sendCall(responseToSend);1324 }1325 1128 } -
internal/TCPPeer.java
r111 r114 36 36 import org.finroc.core.FrameworkElement.ChildIterator; 37 37 import org.finroc.core.RuntimeSettings; 38 import org.finroc.core.datatype.FrameworkElementInfo;39 38 import org.finroc.core.plugin.ExternalConnection; 39 import org.finroc.core.remote.BufferedModelChanges; 40 import org.finroc.core.remote.Definitions; 40 41 import org.finroc.core.remote.ModelNode; 41 42 import org.finroc.plugins.tcp.internal.TCP.PeerType; … … 70 71 71 72 /** Amount of structure this client is interested in */ 72 final FrameworkElementInfo.StructureExchange structureExchange;73 final Definitions.StructureExchange structureExchange; 73 74 74 75 /** … … 115 116 * @param serverListenAddress The address that server is supposed to listen on ("" will enable IPv6) 116 117 */ 117 public TCPPeer(ExternalConnection frameworkElement, String peerName, TCP.PeerType peerType, FrameworkElementInfo.StructureExchange structureExchange, String networkConnection,118 public TCPPeer(ExternalConnection frameworkElement, String peerName, TCP.PeerType peerType, Definitions.StructureExchange structureExchange, String networkConnection, 118 119 int preferredServerPort, boolean tryNextPortsIfOccupied, boolean autoConnectToAllPeers, String serverListenAddress) { 119 120 this.connectionElement = frameworkElement; … … 244 245 activelyConnect = true; 245 246 modelRootNode = new ModelNode("TCP"); 246 connectionElement.getModelHandler().setModelRoot(modelRootNode); 247 BufferedModelChanges changes = new BufferedModelChanges(); 248 changes.setModelRoot(modelRootNode); 249 connectionElement.getModelHandler().applyModelChanges(changes); 247 250 248 251 if (address.length() > 0) { … … 368 371 if (fe instanceof RemotePart && fe.isReady()) { 369 372 RemotePart rs = (RemotePart)fe; 370 if (!rs. peerInfo.connected) {373 if (!rs.isConnected()) { 371 374 return 0; 372 375 } … … 388 391 if (fe instanceof RemotePart && fe.isReady()) { 389 392 RemotePart rs = (RemotePart)fe; 390 if (!rs. peerInfo.connected) {393 if (!rs.isConnected()) { 391 394 continue; 392 395 } … … 411 414 */ 412 415 public boolean isAdminConnection() { 413 return structureExchange == FrameworkElementInfo.StructureExchange.FINSTRUCT;416 return structureExchange == Definitions.StructureExchange.FINSTRUCT; 414 417 } 415 418 … … 445 448 * @param address IP address of remote part 446 449 * @param neverForget Is this a remote peer to never forget? 450 * @param partnerHandleStampWidth Stamp Width of partner's framework element handles 447 451 * @return Pointer to remote part 448 452 */ 449 public RemotePart getRemotePart(UUID uuid, TCP.PeerType peerType, String peerName, InetAddress address, boolean neverForget ) throws Exception {453 public RemotePart getRemotePart(UUID uuid, TCP.PeerType peerType, String peerName, InetAddress address, boolean neverForget, int partnerHandleStampWidth) throws Exception { 450 454 synchronized (connectTo) { 451 455 if (uuid.equals(this.thisPeer.uuid)) { … … 457 461 if (uuid.equals(info.uuid)) { 458 462 if (info.remotePart == null) { 459 info.remotePart = new RemotePart(info, connectionElement, this );463 info.remotePart = new RemotePart(info, connectionElement, this, partnerHandleStampWidth); 460 464 info.remotePart.init(); 461 465 } … … 475 479 info.name = peerName; 476 480 info.neverForget = neverForget; 477 info.remotePart = new RemotePart(info, connectionElement, this );481 info.remotePart = new RemotePart(info, connectionElement, this, partnerHandleStampWidth); 478 482 info.remotePart.init(); 479 483 return info.remotePart; … … 511 515 } 512 516 if (modelRootNode != null) { 513 connectionElement.getModelHandler().removeNode(modelRootNode); 517 BufferedModelChanges changes = new BufferedModelChanges(); 518 changes.removeNode(modelRootNode); 519 connectionElement.getModelHandler().applyModelChanges(changes); 514 520 } 515 521 modelRootNode = null; … … 522 528 */ 523 529 private void serializePeerInfo(BinaryOutputStream stream, PeerInfo peer) { 524 if ((peer == thisPeer || peer. connected) && peer.peerType != TCP.PeerType.CLIENT_ONLY) {530 if ((peer == thisPeer || peer.isConnected()) && peer.peerType != TCP.PeerType.CLIENT_ONLY) { 525 531 stream.writeBoolean(true); 526 532 … … 666 672 667 673 for (PeerInfo info : otherPeers) { 668 if ((!info. connected) && (!info.connecting) && (info.peerType != PeerType.CLIENT_ONLY)) {674 if ((!info.isConnected()) && (!info.connecting) && (info.peerType != PeerType.CLIENT_ONLY)) { 669 675 new PeerConnectorThread(info).start(); 670 676 } … … 689 695 this.connectTo = connectTo; 690 696 modelNode = new ModelNode("Looking for " + TCP.formatInetSocketAddress(connectTo) + "..."); 691 connectionElement.getModelHandler().addNode(modelRootNode, modelNode); 697 BufferedModelChanges changes = new BufferedModelChanges(); 698 changes.addNode(modelRootNode, modelNode); 699 connectionElement.getModelHandler().applyModelChanges(changes); 692 700 } 693 701 … … 698 706 stopThread(); 699 707 if (modelRootNode != null) { 700 connectionElement.getModelHandler().removeNode(modelNode); 708 BufferedModelChanges changes = new BufferedModelChanges(); 709 changes.removeNode(modelNode); 710 connectionElement.getModelHandler().applyModelChanges(changes); 701 711 } 702 712 if (peerInfo != null) { … … 710 720 try { 711 721 socket.connect(connectTo); 712 connection1 = new TCPConnection(TCPPeer.this, socket, TCP. BULK_DATA, false, modelNode);722 connection1 = new TCPConnection(TCPPeer.this, socket, TCP.PRIMARY_CONNECTION | TCP.BULK_DATA, false, modelNode); 713 723 //new TCPConnection(TCPPeer.this, socket, TCP.MANAGEMENT_DATA | TCP.EXPRESS_DATA, true, modelNode); 714 724 socket = new Socket(); 715 725 socket.connect(connectTo); 716 726 //new TCPConnection(TCPPeer.this, socket, TCP.BULK_DATA, false, modelNode); 717 connection2 = new TCPConnection(TCPPeer.this, socket, TCP. MANAGEMENT_DATA | TCP.EXPRESS_DATA, true, modelNode);727 connection2 = new TCPConnection(TCPPeer.this, socket, TCP.EXPRESS_DATA, true, modelNode); 718 728 peerInfo = findPeerInfoFor(connectTo); 719 729 peerInfo.remotePart.initAndCheckForAdminPort(modelNode); … … 733 743 peerInfo.remotePart.deleteAllChildren(); 734 744 } 735 connectionElement.getModelHandler().changeNodeName(modelNode, "Looking for " + TCP.formatInetSocketAddress(connectTo) + "..."); 736 //e.printStackTrace(); 745 BufferedModelChanges changes = new BufferedModelChanges(); 746 changes.changeNodeName(modelNode, "Looking for " + TCP.formatInetSocketAddress(connectTo) + "..."); 747 connectionElement.getModelHandler().applyModelChanges(changes); 748 e.printStackTrace(); 737 749 socket.close(); 738 750 // simply try again … … 756 768 peer.connecting = true; 757 769 modelNode = new ModelNode("Looking for " + peer.uuid.toString() + "..."); 758 connectionElement.getModelHandler().addNode(modelRootNode, modelNode); 770 BufferedModelChanges changes = new BufferedModelChanges(); 771 changes.addNode(modelRootNode, modelNode); 772 connectionElement.getModelHandler().applyModelChanges(changes); 759 773 } 760 774 761 775 @Override 762 776 public void mainLoopCallback() throws Exception { 763 if ((!activelyConnect) || peer. connected) {777 if ((!activelyConnect) || peer.isConnected()) { 764 778 stopThread(); 765 779 if (modelRootNode != null) { 766 connectionElement.getModelHandler().removeNode(modelNode); 780 BufferedModelChanges changes = new BufferedModelChanges(); 781 changes.removeNode(modelNode); 782 connectionElement.getModelHandler().applyModelChanges(changes); 767 783 } 768 784 peer.connecting = false; … … 777 793 TCPConnection connection1 = null, connection2 = null; 778 794 try { 779 if (peer.remotePart == null || peer.remotePart.bulkConnection == null) { 795 RemotePart remotePart = peer.remotePart; 796 if (remotePart != null && (!remotePart.isConnected())) { 797 continue; // disconnecting - wait 798 } 799 800 if (remotePart == null || remotePart.getPrimaryConnection() == null) { 780 801 socket.connect(socketAddress); 781 802 //new TCPConnection(TCPPeer.this, socket, TCP.MANAGEMENT_DATA | TCP.EXPRESS_DATA, false, modelNode); 782 connection1 = new TCPConnection(TCPPeer.this, socket, TCP. BULK_DATA, false, modelNode);783 } 784 if (peer.remotePart == null || peer.remotePart. managementConnection== null) {803 connection1 = new TCPConnection(TCPPeer.this, socket, TCP.PRIMARY_CONNECTION | TCP.BULK_DATA, false, modelNode); 804 } 805 if (peer.remotePart == null || peer.remotePart.getExpressConnection() == null) { 785 806 socket = new Socket(); 786 807 socket.connect(socketAddress); 787 808 //new TCPConnection(TCPPeer.this, socket, TCP.BULK_DATA, false, modelNode); 788 connection2 = new TCPConnection(TCPPeer.this, socket, TCP. MANAGEMENT_DATA | TCP.EXPRESS_DATA, false, modelNode);809 connection2 = new TCPConnection(TCPPeer.this, socket, TCP.EXPRESS_DATA, false, modelNode); 789 810 } 790 811 peer.remotePart.initAndCheckForAdminPort(modelNode); … … 816 837 peer.remotePart.deleteAllChildren(); 817 838 } 818 connectionElement.getModelHandler().changeNodeName(modelNode, "Looking for " + peer.uuid.toString() + "..."); 839 BufferedModelChanges changes = new BufferedModelChanges(); 840 changes.changeNodeName(modelNode, "Looking for " + peer.uuid.toString() + "..."); 841 connectionElement.getModelHandler().applyModelChanges(changes); 819 842 socket.close(); 820 843 // simply try again
Note: See TracChangeset
for help on using the changeset viewer.