Changeset 114:05d0451ae828 in finroc_plugins_tcp-java


Ignore:
Timestamp:
08.06.2017 03:24:53 (6 years ago)
Author:
Max Reichardt <mreichardt@…>
Branch:
17.03
Phase:
public
Message:

Factors out TCP-independent part of protocol (similar to what was done in C++) and adapts to changes Finroc17 (e.g. management of serialization version)

Files:
2 deleted
7 edited

Legend:

Unmodified
Added
Removed
  • Peer.java

    r91 r114  
    2222package org.finroc.plugins.tcp; 
    2323 
    24 import org.finroc.core.datatype.FrameworkElementInfo; 
    2524import org.finroc.core.plugin.ExternalConnection; 
     25import org.finroc.core.remote.Definitions; 
    2626import org.finroc.plugins.tcp.internal.TCP; 
    2727import org.finroc.plugins.tcp.internal.TCPPeer; 
     
    5858            serverListenAddress = "0.0.0.0"; 
    5959        } 
    60         implementation = new TCPPeer(this, peerName, TCP.PeerType.FULL, FrameworkElementInfo.StructureExchange.SHARED_PORTS, networkConnection, 
     60        implementation = new TCPPeer(this, peerName, TCP.PeerType.FULL, Definitions.StructureExchange.SHARED_PORTS, networkConnection, 
    6161                                     preferredServerPort, tryNextPortsIfOccupied, autoConnectToAllPeers, serverListenAddress); 
    6262    } 
     
    7575            serverListenAddress = "0.0.0.0"; 
    7676        } 
    77         implementation = new TCPPeer(this, peerName, TCP.PeerType.SERVER_ONLY, FrameworkElementInfo.StructureExchange.SHARED_PORTS, "", 
     77        implementation = new TCPPeer(this, peerName, TCP.PeerType.SERVER_ONLY, Definitions.StructureExchange.SHARED_PORTS, "", 
    7878                                     preferredServerPort, tryNextPortsIfOccupied, false, serverListenAddress); 
    7979    } 
     
    8787     * @param autoConnectToAllPeers Auto-connect to all peers that become known? 
    8888     */ 
    89     public Peer(String peerName, String networkConnection, FrameworkElementInfo.StructureExchange structureExchange, boolean autoConnectToAllPeers) { 
     89    public Peer(String peerName, String networkConnection, Definitions.StructureExchange structureExchange, boolean autoConnectToAllPeers) { 
    9090        super("TCP", networkConnection); 
    9191        implementation = new TCPPeer(this, peerName, TCP.PeerType.CLIENT_ONLY, structureExchange, networkConnection, -1, false, autoConnectToAllPeers, ""); 
  • TCPSettings.java

    r101 r114  
    5050    public static final int DEQUEUE_QUEUE_SIZE = 50; 
    5151 
    52     /** Maximum not acknowledged Packet */ 
    53     public static final int MAX_NOT_ACKNOWLEDGED_PACKETS = 0x1F; // 32 (2^x for fast modulo) 
    54  
    55     /** Packets considered when calculating avergage ping time */ 
    56     public static final int AVG_PING_PACKETS = 0x7; // 8 (2^x for fast modulo) 
    57  
    5852    /** Help for debugging: insert checks in data stream => more bandwidth */ 
    5953    public static final boolean DEBUG_TCP = true; 
  • internal/PeerInfo.java

    r109 r114  
    5757    public ArrayList<InetAddress> addresses = new ArrayList<InetAddress>(); 
    5858 
    59     /** Are we currently connected with this peer? */ 
    60     public volatile boolean connected; 
    61  
    6259    /** True, if there are ongoing attempts to connect to this peer */ 
    6360    public volatile boolean connecting; 
     
    8885    } 
    8986 
     87    /** Whether there is currently a connection to this peer */ 
     88    public boolean isConnected() { 
     89        return remotePart != null && remotePart.isConnected(); 
     90    } 
     91 
    9092    public String toString() { 
    9193        return name != null ? (name + " (" + uuid.toString() + ")") : uuid.toString(); 
  • internal/RemotePart.java

    r107 r114  
    2222package org.finroc.plugins.tcp.internal; 
    2323 
    24 import java.util.ArrayList; 
    25 import java.util.HashMap; 
    26 import java.util.List; 
    2724import java.util.concurrent.atomic.AtomicInteger; 
    28 import java.util.concurrent.atomic.AtomicLong; 
    2925 
    3026import org.finroc.core.FrameworkElement; 
    31 import org.finroc.core.FrameworkElementFlags; 
    32 import org.finroc.core.FrameworkElementTags; 
    33 import org.finroc.core.LockOrderLevels; 
    3427import org.finroc.core.RuntimeEnvironment; 
    35 import org.finroc.core.admin.AdminClient; 
    36 import org.finroc.core.admin.AdministrationService; 
    37 import org.finroc.core.datatype.FrameworkElementInfo; 
    38 import org.finroc.core.port.AbstractPort; 
    39 import org.finroc.core.port.PortCreationInfo; 
    40 import org.finroc.core.port.cc.CCPortBase; 
    41 import org.finroc.core.port.cc.CCPortDataManager; 
    42 import org.finroc.core.port.cc.CCPortDataManagerTL; 
    43 import org.finroc.core.port.cc.CCPullRequestHandler; 
    44 import org.finroc.core.port.net.NetPort; 
    45 import org.finroc.core.port.rpc.RPCInterfaceType; 
    46 import org.finroc.core.port.rpc.internal.AbstractCall; 
    47 import org.finroc.core.port.rpc.internal.RPCMessage; 
    48 import org.finroc.core.port.rpc.internal.RPCPort; 
    49 import org.finroc.core.port.rpc.internal.RPCRequest; 
    50 import org.finroc.core.port.rpc.internal.RPCResponse; 
    51 import org.finroc.core.port.rpc.internal.AbstractCall.CallType; 
    52 import org.finroc.core.port.std.PortBase; 
    53 import org.finroc.core.port.std.PortDataManager; 
    54 import org.finroc.core.port.std.PullRequestHandler; 
    55 import org.finroc.core.portdatabase.CCType; 
    56 import org.finroc.core.portdatabase.FinrocTypeInfo; 
    57 import org.finroc.core.remote.ModelNode; 
    58 import org.finroc.core.remote.RemoteFrameworkElement; 
    59 import org.finroc.core.remote.RemotePort; 
    60 import org.finroc.core.remote.RemoteRuntime; 
    61 import org.finroc.core.remote.RemoteTypes; 
    62 import org.finroc.plugins.tcp.internal.TCP.OpCode; 
     28import org.finroc.core.net.generic_protocol.Connection; 
     29import org.finroc.core.net.generic_protocol.RemoteRuntime; 
     30import org.finroc.core.remote.BufferedModelChanges; 
     31import org.finroc.core.remote.Definitions; 
     32import org.finroc.core.remote.ModelHandler; 
    6333import org.rrlib.logging.Log; 
    6434import org.rrlib.logging.LogLevel; 
    6535import org.rrlib.serialization.BinaryInputStream; 
    66 import org.rrlib.serialization.MemoryBuffer; 
    67 import org.rrlib.serialization.Serialization; 
    68 import org.rrlib.serialization.compression.Compressible; 
    69 import org.rrlib.serialization.rtti.DataTypeBase; 
    7036 
    7137/** 
     
    7541 * It creates a proxy port for each shared port in the remote runtime. 
    7642 */ 
    77 public class RemotePart extends FrameworkElement implements PullRequestHandler, CCPullRequestHandler { 
     43public class RemotePart extends RemoteRuntime { 
    7844 
    7945    /** Peer info that this part belongs to */ 
     
    8349    final TCPPeer peerImplementation; 
    8450 
    85     /** Connection to transfer "express" ports data */ 
    86     TCPConnection expressConnection; 
    87  
    88     /** Connection to transfer "bulk" ports data */ 
    89     TCPConnection bulkConnection; 
    90  
    91     /** Connection to transfer all other "management" data */ 
    92     TCPConnection managementConnection; 
    93  
    94     /** Structure information to send to remote part */ 
    95     private FrameworkElementInfo.StructureExchange sendStructureInfo = FrameworkElementInfo.StructureExchange.NONE; 
    96  
    97     /** Framework element that contains all global links - possibly NULL */ 
    98     private FrameworkElement globalLinks; 
    99  
    100     /** Framework element that contains all server ports - possibly NULL */ 
    101     private FrameworkElement serverPorts; 
    102  
    103     /** 
    104      * Lookup for remote framework elements (currently not ports) - similar to remote CoreRegister 
    105      * (should only be accessed by reader thread of management connection) 
    106      */ 
    107     private HashMap<Integer, ProxyPort> remotePortRegister = new HashMap<Integer, ProxyPort>(); 
    108  
    109     /** Temporary buffer for match checks (only used by bulk reader or connector thread) */ 
    110     private final StringBuilder tmpMatchBuffer = new StringBuilder(); 
    111  
    112     /** Administration interface client port */ 
    113     private final AdminClient adminInterface; 
    114  
    115     /** Remote part's current model node */ 
    116     private RemoteRuntime currentModelNode; 
    117  
    118     /** 
    119      * Remote part's new model node. Is created on new connection. 
    120      * CurrentModelNode is replaced with this, as soon as connection is fully initialized. 
    121      * As this is not published yet, we can operate on this with any thread. 
    122      */ 
    123     private RemoteRuntime newModelNode; 
    124  
    125     /** List with remote ports that have not been initialized yet */ 
    126     private ArrayList<ProxyPort> uninitializedRemotePorts = new ArrayList<ProxyPort>(); 
    127  
    128     /** Next call id to assign to sent call */ 
    129     AtomicLong nextCallId = new AtomicLong(); 
    130  
    131     /** Pull calls that wait for a response */ 
    132     private ArrayList<PullCall> pullCallsAwaitingResponse = new ArrayList<PullCall>(); 
    133  
    13451    /** Number of times disconnect was called, since last connect */ 
    13552    private final AtomicInteger disconnectCalls = new AtomicInteger(0); 
    13653 
    137     /** Has remote part compression support? */ 
    138     private boolean hasCompressionSupport = false; 
    139  
    14054 
    14155    /** Peer info this part is associated with */ 
    142     RemotePart(PeerInfo peerInfo, FrameworkElement parent, TCPPeer peerImplementation) { 
    143         super(parent, peerInfo.toString()); 
     56    RemotePart(PeerInfo peerInfo, FrameworkElement parent, TCPPeer peerImplementation, int partnerHandleStampWidth) { 
     57        super(parent, peerInfo.toString(), peerImplementation.structureExchange == Definitions.StructureExchange.FINSTRUCT, partnerHandleStampWidth); 
    14458        this.peerInfo = peerInfo; 
    14559        this.peerImplementation = peerImplementation; 
    146         adminInterface = (peerImplementation.structureExchange == FrameworkElementInfo.StructureExchange.FINSTRUCT) ? 
    147                          new AdminClient("AdminClient " + getName(), peerImplementation.connectionElement) : null; 
    14860    } 
    14961 
    150     /** 
    151      * Add connection for this remote part 
    152      * 
    153      * @param connection Connection to add (with flags set) 
    154      * @return Did this succeed? (fails if there already is a connection for specified types of data; may happen if two parts try to connect at the same time - only one connection is kept) 
    155      */ 
    156     boolean addConnection(TCPConnection connection) { 
    157         if (connection.bulk) { 
    158             if (bulkConnection != null) { 
    159                 return false; 
    160             } 
    161             bulkConnection = connection; 
    162         } else { 
    163             if (expressConnection != null || managementConnection != null) { 
    164                 return false; 
    165             } 
    166             expressConnection = connection; 
    167             managementConnection = connection; 
    168         } 
    169  
    170         if (bulkConnection != null && expressConnection != null && managementConnection != null) { 
    171             peerInfo.connected = true; 
    172             Log.log(LogLevel.DEBUG, this, "Connected to " + peerInfo.toString()); 
    173         } 
    174  
    175         return true; 
     62    /** Whether there is currently a connection to this peer */ 
     63    public boolean isConnected() { 
     64        return getPrimaryConnection() != null && (!((TCPConnection)getPrimaryConnection()).isDisconnected()); 
    17665    } 
    17766 
    178     /** 
    179      * Called during initial structure exchange 
    180      * 
    181      * @param info Info on another remote framework element 
    182      * @param initalStructure Is this call originating from initial structure exchange? 
    183      * @param remoteRuntime Remote runtime object to add structure to 
    184      */ 
    185     void addRemoteStructure(FrameworkElementInfo info, boolean initalStructureExchange) { 
    186         RemoteRuntime remoteRuntime = initalStructureExchange ? newModelNode : currentModelNode; 
    187         long startTime = 0; 
    188         while (remoteRuntime == null && (!initalStructureExchange)) { 
    189             remoteRuntime = currentModelNode; 
    190             Log.log(LogLevel.DEBUG, this, "Waiting for remote model to become ready"); 
    191             if (startTime == 0) { 
    192                 startTime = System.currentTimeMillis(); 
    193             } else if (System.currentTimeMillis() - startTime > 1000) { // We should not block thread forever (as this blocks GUI interaction in e.g. finstruct) 
    194                 throw new RuntimeException("No model node available"); 
    195             } 
    196             try { 
    197                 Thread.sleep(20); 
    198             } catch (InterruptedException e) {} 
    199         } 
    200         Log.log(LogLevel.DEBUG_VERBOSE_1, this, "Adding element: " + info.toString()); 
    201         if (info.isPort()) { 
    202             if ((!hasCompressionSupport) && info.getDataType().getName().equals("finroc.data_compression.CompressionRules")) { 
    203                 hasCompressionSupport = true; 
    204             } 
    205             ProxyPort port = new ProxyPort(info); 
    206             for (int i = 0; i < info.getLinkCount(); i++) { 
    207                 RemoteFrameworkElement remoteElement = new RemotePort(info.getHandle(), info.getLink(i).name, port.getPort(), i); 
    208                 if (i == 0) { 
    209                     remoteRuntime.elementLookup.put(info.getHandle(), remoteElement); 
    210                 } 
    211                 remoteElement.setName(info.getLink(i).name); 
    212                 remoteElement.setTags(info.getTags()); 
    213                 remoteElement.setFlags(info.getFlags()); 
    214                 ModelNode parent = getFrameworkElement(info.getLink(i).parent, remoteRuntime); 
    215                 if (initalStructureExchange) { 
    216                     parent.add(remoteElement); 
    217                 } else { 
    218                     peerImplementation.connectionElement.getModelHandler().addNode(parent, remoteElement); 
    219                     if (peerImplementation.structureExchange == FrameworkElementInfo.StructureExchange.SHARED_PORTS) { 
    220                         port.getPort().init(); 
    221                     } else { 
    222                         uninitializedRemotePorts.add(port); 
    223                     } 
    224                 } 
    225             } 
    226         } else { 
    227             RemoteFrameworkElement remoteElement = getFrameworkElement(info.getHandle(), remoteRuntime); 
    228             remoteElement.setName(info.getLink(0).name); 
    229             remoteElement.setTags(info.getTags()); 
    230             remoteElement.setFlags(info.getFlags()); 
    231             ModelNode parent = getFrameworkElement(info.getLink(0).parent, remoteRuntime); 
    232             if (initalStructureExchange) { 
    233                 parent.add(remoteElement); 
    234             } else { 
    235                 peerImplementation.connectionElement.getModelHandler().addNode(parent, remoteElement); 
    236             } 
    237         } 
    238     } 
    239  
    240     /** 
    241      * Creates new model of remote part 
    242      */ 
    243     void createNewModel() { 
    244         newModelNode = new RemoteRuntime(peerInfo.toString(), peerInfo.uuid.toString(), adminInterface, managementConnection.remoteTypes); 
     67    @Override 
     68    public void createNewModel() { 
     69        newModelNode = new org.finroc.core.remote.RemoteRuntime(peerInfo.toString(), peerInfo.uuid.toString(), getAdminInterface(), getPrimaryConnection().getReadBufferStream(), handleStampWidth); 
    24570        newModelNode.setFlags(RuntimeEnvironment.getInstance().getAllFlags()); 
    24671    } 
    24772 
    248     /** 
    249      * Creates qualified link for element of remote framework element model 
    250      * 
    251      * @param remoteElement Element to create link for 
    252      * @return Created Link 
    253      */ 
    254     private String createPortName(RemoteFrameworkElement remoteElement) { 
    255         return remoteElement.getQualifiedLink(); 
    256     } 
    257  
    258     /** 
    259      * Deletes all child elements of remote part 
    260      */ 
    261     void deleteAllChildren() { 
    262         ChildIterator ci = new ChildIterator(this); 
    263         FrameworkElement child; 
    264         while ((child = ci.next()) != null) { 
    265             child.managedDelete(); 
    266         } 
    267         remotePortRegister = new HashMap<Integer, ProxyPort>(); 
    268     } 
    269  
    270     /** 
    271      * Disconnects remote part 
    272      */ 
     73    @Override 
    27374    public void disconnect() { 
    27475        // make sure that disconnect is only called once... prevents deadlocks cleaning up all the threads 
     
    27980 
    28081        synchronized (peerImplementation.connectTo) { 
    281             if (managementConnection != null) { 
    282                 managementConnection.disconnect(); 
     82            if (getPrimaryConnection() != null) { 
     83                getPrimaryConnection().disconnect(); 
    28384            } 
    284             if (expressConnection != null) { 
    285                 expressConnection.disconnect(); 
    286             } 
    287             if (bulkConnection != null) { 
    288                 bulkConnection.disconnect(); 
     85            if (getExpressConnection() != null) { 
     86                getExpressConnection().disconnect(); 
    28987            } 
    29088            disconnectCalls.set(0); 
     
    29290    } 
    29391 
    294     /** 
    295      * @param dataRate Data Rate 
    296      * @return Formatted Data Rate 
    297      */ 
    298     private static String formatRate(int dataRate) { 
    299         if (dataRate < 1000) { 
    300             return "" + dataRate; 
    301         } else if (dataRate < 10000000) { 
    302             return (dataRate / 1000) + "k"; 
    303         } else { 
    304             return (dataRate / 1000000) + "M"; 
    305         } 
    306     } 
    307  
    308     /** 
    309      * @return Connection quality (see ExternalConnection) 
    310      */ 
    311     public float getConnectionQuality() { 
    312         if (bulkConnection == null || expressConnection == null || managementConnection == null || (!peerInfo.connected)) { 
    313             return 0; 
    314         } 
    315         float pingTime = 0; 
    316         for (int i = 0; i < 3; i++) { 
    317             TCPConnection c = (i == 0) ? managementConnection : (i == 1 ? bulkConnection : expressConnection); 
    318             if (c != null) { 
    319                 if (c.pingTimeExceeed()) { 
    320                     return 0; 
    321                 } 
    322                 pingTime = Math.max(pingTime, (float)c.getAvgPingTime()); 
    323             } 
    324         } 
    325         if (pingTime < 300) { 
    326             return 1; 
    327         } else if (pingTime > 1300) { 
    328             return 0; 
    329         } else { 
    330             return ((float)pingTime - 300.0f) / 1000.0f; 
    331         } 
    332     } 
    333  
    334     /** 
    335      * Returns framework element with specified handle. 
    336      * Creates one if it doesn't exist. 
    337      * 
    338      * @param handle Remote handle of framework element 
    339      * @param remoteRuntime Remote runtime model to use for lookup 
    340      * @return Framework element. 
    341      */ 
    342     public RemoteFrameworkElement getFrameworkElement(int handle, RemoteRuntime remoteRuntime) { 
    343         RemoteFrameworkElement remoteElement = remoteRuntime.elementLookup.get(handle); 
    344         if (remoteElement == null) { 
    345             remoteElement = new RemoteFrameworkElement(handle, "(unknown)"); 
    346             remoteRuntime.elementLookup.put(handle, remoteElement); 
    347         } 
    348         return remoteElement; 
    349     } 
    350  
    351     /** 
    352      * @return Framework element that contains all global links (possibly created by call to this) 
    353      */ 
    354     public FrameworkElement getGlobalLinkElement() { 
    355         if (globalLinks == null) { 
    356             globalLinks = new FrameworkElement(this, "global", Flag.NETWORK_ELEMENT | Flag.GLOBALLY_UNIQUE_LINK | Flag.ALTERNATIVE_LINK_ROOT, -1); 
    357         } 
    358         return globalLinks; 
    359     } 
    360  
    361     /** 
    362      * @return String containing ping times 
    363      */ 
    364     public String getPingString() { 
    365         if (!peerInfo.connected) { 
    366             return "disconnected"; 
    367         } 
    368  
    369         int pingAvg = 0; 
    370         int pingMax = 0; 
    371         int dataRate = 0; 
    372         String s = "ping (avg/max/Rx): "; 
    373         if (bulkConnection == null || expressConnection == null) { // should be disconnected... but maybe this is even safer 
    374             return s + "- "; 
    375         } 
    376         for (int i = 0; i < 3; i++) { 
    377             TCPConnection c = (i == 0) ? managementConnection : (i == 1 ? bulkConnection : expressConnection); 
    378             if (c != null) { 
    379                 if (c.pingTimeExceeed()) { 
    380                     return s + "- "; 
    381                 } 
    382                 pingAvg = Math.max(pingAvg, c.getAvgPingTime()); 
    383                 pingMax = Math.max(pingMax, c.getMaxPingTime()); 
    384                 dataRate += c.getRx(); 
    385             } 
    386         } 
    387         return s + pingAvg + "ms/" + pingMax + "ms/" + formatRate(dataRate); 
    388     } 
    389  
    390     /** 
    391      * Initializes part and checks for admin port to connect to 
    392      * 
    393      * @param obsoleteNode Model node that is now obsolete 
    394      */ 
    395     public void initAndCheckForAdminPort(final ModelNode obsoleteNode) { 
    396  
    397         if (peerImplementation.structureExchange != FrameworkElementInfo.StructureExchange.SHARED_PORTS) { 
    398  
    399             // expand port names 
    400             synchronized (getRegistryLock()) { 
    401                 for (ProxyPort pp : remotePortRegister.values()) { 
    402                     RemotePort[] remotePorts = RemotePort.get(pp.getPort()); 
    403                     for (int i = 0; i < remotePorts.length; i++) { 
    404                         pp.getPort().setName(createPortName(remotePorts[i]), i); 
    405                     } 
    406                 } 
    407             } 
    408         } 
    409  
    410         this.init(); 
    411  
    412         // connect to admin interface? 
    413         if (adminInterface != null) { 
    414             FrameworkElement fe = getChildElement(AdministrationService.QUALIFIED_PORT_NAME, false); 
    415             if (fe != null && fe.isPort() && fe.isReady()) { 
    416                 ((AbstractPort)fe).connectTo(adminInterface.getWrapped()); 
    417             } else { 
    418                 Log.log(LogLevel.ERROR, this, "Could not find administration port to connect to."); 
    419             } 
    420         } 
    421  
    422         // set remote type in RemoteRuntime Annotation 
    423         //((RemoteRuntime)getAnnotation(RemoteRuntime.class)).setRemoteTypes(managementConnection.updateTimes); 
    424         final RemoteRuntime oldModel = currentModelNode; 
    425         final RemoteRuntime newModel = newModelNode; 
    426         currentModelNode = newModelNode; 
    427         newModelNode = null; 
    428  
    429         if (oldModel != null) { 
    430             peerImplementation.connectionElement.getModelHandler().removeNode(obsoleteNode); 
    431             peerImplementation.connectionElement.getModelHandler().replaceNode(oldModel, newModel); 
    432         } else { 
    433             peerImplementation.connectionElement.getModelHandler().replaceNode(obsoleteNode, newModel); 
    434         } 
    435     } 
    436  
    437     /** 
    438      * Process message with specified opcode in provided stream 
    439      * 
    440      * @param opCode Opcode of message 
    441      * @param stream Stream to read message from 
    442      * @param remoteTypes Info on remote types 
    443      * @param connection Connection this was called from 
    444      * @param elementInfoBuffer Framework element info buffer to use 
    445      */ 
    446     public void processMessage(OpCode opCode, BinaryInputStream stream, RemoteTypes remoteTypes, TCPConnection connection) throws Exception { 
    447         Log.log(LogLevel.DEBUG_VERBOSE_1, this, "Processing message " + opCode.toString()); 
    448  
    449         switch (opCode) { 
    450         case PORT_VALUE_CHANGE: 
    451         case SMALL_PORT_VALUE_CHANGE: 
    452         case SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP: 
    453             int handle = stream.readInt(); 
    454             Serialization.DataEncoding encoding = stream.readEnum(Serialization.DataEncoding.class); 
    455             AbstractPort port = RuntimeEnvironment.getInstance().getPort(handle); 
    456             if (port != null && port.isReady() && (!FinrocTypeInfo.isMethodType(port.getDataType(), true))) { 
    457                 NetPort netPort = port.asNetPort(); 
    458                 if (netPort != null) { 
    459                     netPort.receiveDataFromStream(stream, encoding, opCode != OpCode.SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP); 
    460                 } 
    461             } 
    462             break; 
    463         case RPC_CALL: 
    464             handle = stream.readInt(); 
    465             CallType callType = stream.readEnum(CallType.class); 
    466             DataTypeBase type = stream.readType(); 
    467             byte methodId = stream.readByte(); 
    468             if (!(type instanceof RPCInterfaceType)) { 
    469                 Log.log(LogLevel.WARNING, this, "Type " + type.getName() + " is no RPC type. Ignoring call."); 
    470                 return; 
    471             } 
    472             RPCInterfaceType rpcInterfaceType = (RPCInterfaceType)type; 
    473  
    474             if (callType == CallType.RPC_MESSAGE || callType == CallType.RPC_REQUEST) { 
    475                 port = RuntimeEnvironment.getInstance().getPort(handle); 
    476                 if (port != null && rpcInterfaceType == port.getDataType()) { 
    477                     //RPCDeserializationScope deserializationScope(message.Get<0>(), connection.rpcCallBufferPools); // TODO? 
    478                     if (callType == CallType.RPC_MESSAGE) { 
    479                         RPCMessage.deserializeAndExecuteCallImplementation(stream, (RPCPort)port, methodId); 
    480                     } else { 
    481                         RPCRequest.deserializeAndExecuteCallImplementation(stream, (RPCPort)port, methodId, connection); 
    482                     } 
    483                 } 
    484             } else { // type is RPC response 
    485                 long callId = stream.readLong(); 
    486  
    487                 AbstractCall callAwaitingThisResponse = null; 
    488                 synchronized (connection.callsAwaitingResponse) { 
    489                     for (TCPConnection.CallAndTimeout call : connection.callsAwaitingResponse) { 
    490                         if (call.call.getCallId() == callId) { 
    491                             callAwaitingThisResponse = call.call; 
    492                             connection.callsAwaitingResponse.remove(call); 
    493                             break; 
    494                         } 
    495                     } 
    496                 } 
    497                 if (callAwaitingThisResponse == null) { // look in other connection; TODO: think about rules which connection to use for RPCs 
    498                     TCPConnection otherConnection = (connection != expressConnection) ? expressConnection : bulkConnection; 
    499                     synchronized (otherConnection.callsAwaitingResponse) { 
    500                         for (TCPConnection.CallAndTimeout call : otherConnection.callsAwaitingResponse) { 
    501                             if (call.call.getCallId() == callId) { 
    502                                 callAwaitingThisResponse = call.call; 
    503                                 otherConnection.callsAwaitingResponse.remove(call); 
    504                                 break; 
    505                             } 
    506                         } 
    507                     } 
    508                 } 
    509                 if (callAwaitingThisResponse != null) { 
    510                     port = RuntimeEnvironment.getInstance().getPort(callAwaitingThisResponse.getLocalPortHandle()); 
    511                     if (port != null) { 
    512                         //RPCDeserializationScope deserializationScope(callAwaitingThisResponse.getLocalPortHandle(), connection.rpcCallBufferPools); // TODO? 
    513                         RPCResponse.deserializeAndExecuteCallImplementation(stream, rpcInterfaceType.getMethod(methodId), connection, callAwaitingThisResponse); 
    514                         return; 
    515                     } 
    516                 } 
    517                 RPCResponse.deserializeAndExecuteCallImplementation(stream, rpcInterfaceType.getMethod(methodId), connection, null); 
    518             } 
    519             break; 
    520         case PULLCALL: 
    521             handle = stream.readInt(); 
    522             long callUid = stream.readLong(); 
    523             encoding = stream.readEnum(Serialization.DataEncoding.class); 
    524  
    525             SerializedTCPCommand pullCallReturn = new SerializedTCPCommand(TCP.OpCode.PULLCALL_RETURN, 8192); 
    526             pullCallReturn.getWriteStream().writeLong(callUid); 
    527             port = RuntimeEnvironment.getInstance().getPort(handle); 
    528             if (port != null && port.isReady() && (!FinrocTypeInfo.isMethodType(port.getDataType(), true))) { 
    529                 pullCallReturn.getWriteStream().writeBoolean(false); 
    530                 if (FinrocTypeInfo.isStdType(port.getDataType())) { 
    531                     CCPortBase pb = (CCPortBase)port; 
    532                     CCPortDataManager manager = pb.getPullInInterthreadContainerRaw(true, true); 
    533                     pullCallReturn.getWriteStream().writeBoolean(true); 
    534                     pullCallReturn.getWriteStream().writeType(manager.getObject().getType()); 
    535                     manager.getTimestamp().serialize(pullCallReturn.getWriteStream()); 
    536                     manager.getObject().serialize(pullCallReturn.getWriteStream(), encoding); 
    537                     manager.recycle2(); 
    538                 } else { 
    539                     PortBase pb = (PortBase)port; 
    540                     PortDataManager manager = pb.getPullLockedUnsafe(true, true); 
    541                     pullCallReturn.getWriteStream().writeBoolean(true); 
    542                     pullCallReturn.getWriteStream().writeType(manager.getType()); 
    543                     manager.getTimestamp().serialize(pullCallReturn.getWriteStream()); 
    544                     manager.getObject().serialize(pullCallReturn.getWriteStream(), encoding); 
    545                     manager.releaseLock(); 
    546                 } 
    547             } else { 
    548                 pullCallReturn.getWriteStream().writeBoolean(false); 
    549             } 
    550             connection.sendCall(pullCallReturn); 
    551             break; 
    552         case PULLCALL_RETURN: 
    553             callUid = stream.readLong(); 
    554             boolean failed = stream.readBoolean(); 
    555  
    556             synchronized (pullCallsAwaitingResponse) { 
    557                 PullCall pullCall = null; 
    558                 for (PullCall pullCallWaiting : pullCallsAwaitingResponse) { 
    559                     if (pullCallWaiting.callId == callUid) { 
    560                         pullCall = pullCallWaiting; 
    561                         break; 
    562                     } 
    563                 } 
    564                 if (pullCall != null) { 
    565                     synchronized (pullCall) { 
    566                         if (!failed) { 
    567                             type = stream.readType(); 
    568                             if (pullCall.origin != null) { 
    569                                 if (pullCall.origin.getDataType() == type) { 
    570                                     PortDataManager manager = pullCall.origin.getUnusedBufferRaw(); 
    571                                     manager.getTimestamp().deserialize(stream); 
    572                                     manager.getObject().deserialize(stream, pullCall.encoding); 
    573                                 } 
    574                             } else if (pullCall.ccResultBuffer != null) { 
    575                                 if (pullCall.ccResultBuffer.getObject().getType() == type) { 
    576                                     pullCall.ccResultBuffer.getTimestamp().deserialize(stream); 
    577                                     pullCall.ccResultBuffer.getObject().deserialize(stream, pullCall.encoding); 
    578                                 } 
    579                             } 
    580                         } 
    581                         pullCall.notify(); 
    582                     } 
    583                 } 
    584             } 
    585             break; 
    586         case TYPE_UPDATE: 
    587             type = stream.readType(); 
    588             remoteTypes.setTime(type, stream.readShort()); 
    589             break; 
    590  
    591 //        case SUBSCRIBE: 
    592 //            // TODO 
    593 //            SubscribeMessage message; 
    594 //            message.deserialize(stream); 
    595 // 
    596 //            // Get or create server port 
    597 //            auto it = serverPortMap.find(message.Get<0>()); 
    598 //            if (it != serverPortMap.end()) { 
    599 //                dataPorts::GenericPort port = it.second; 
    600 //                NetworkPortInfo* networkPortInfo = port.GetAnnotation<NetworkPortInfo>(); 
    601 //                networkPortInfo.setServerSideSubscriptionData(message.Get<1>(), message.Get<2>(), message.Get<3>(), message.Get<5>()); 
    602 // 
    603 //                boolean pushStrategy = message.Get<1>() > 0; 
    604 //                boolean reversePushStrategy = message.Get<2>(); 
    605 //                if (port.getWrapped().pushStrategy() != pushStrategy || port.getWrapped().reversePushStrategy() != reversePushStrategy) { 
    606 //                    // flags need to be changed 
    607 //                    thread::Lock lock(getStructureMutex(), false); 
    608 //                    if (lock.tryLock()) { 
    609 //                        if (port.getWrapped().pushStrategy() != pushStrategy) { 
    610 //                            port.getWrapped().setPushStrategy(pushStrategy); 
    611 //                        } 
    612 //                        if (port.getWrapped().reversePushStrategy() != reversePushStrategy) { 
    613 //                            port.getWrapped().setReversePushStrategy(reversePushStrategy); 
    614 //                        } 
    615 //                    } 
    616 //                    else { 
    617 //                        return true; // We could not obtain lock - try again later 
    618 //                    } 
    619 //                } 
    620 //            } 
    621 //            else { 
    622 //                thread::Lock lock(getStructureMutex(), false); 
    623 //                if (lock.tryLock()) { 
    624 //                    // Create server port 
    625 //                    AbstractPort* port = RuntimeEnvironment::getInstance().getPort(message.Get<0>()); 
    626 //                    if ((!port) || (!port.isReady())) { 
    627 //                        fINROC_LOG_PRINT(DEBUG_WARNING, "Port for subscription not available"); 
    628 //                        return false; 
    629 //                    } 
    630 // 
    631 //                    Flags flags = Flag::NETWORK_ELEMENT | Flag::VOLATILE; 
    632 //                    if (port.isOutputPort()) { 
    633 //                        flags |= Flag::ACCEPTS_DATA; // create input port 
    634 //                    } 
    635 //                    else { 
    636 //                        flags |= Flag::OUTPUT_PORT | Flag::EMITS_DATA; // create output io port 
    637 //                    } 
    638 //                    if (sendStructureInfo != StructureExchange::SHARED_PORTS) { 
    639 //                        flags |= Flag::TOOL_PORT; 
    640 //                    } 
    641 //                    if (message.Get<1>() > 0) { 
    642 //                        flags |= Flag::PUSH_STRATEGY; 
    643 //                    } 
    644 //                    if (message.Get<2>()) { 
    645 //                        flags |= Flag::PUSH_STRATEGY_REVERSE; 
    646 //                    } 
    647 // 
    648 //                    dataPorts::GenericPort createdPort(port.getQualifiedName().substr(1), getServerPortsElement(), port.getDataType(), flags, message.Get<3>()); 
    649 //                    NetworkPortInfo* networkPortInfo = new NetworkPortInfo(*this, message.Get<4>(), message.Get<1>(), true, *createdPort.getWrapped()); 
    650 //                    networkPortInfo.setServerSideSubscriptionData(message.Get<1>(), message.Get<2>(), message.Get<3>(), message.Get<5>()); 
    651 //                    createdPort.addPortListenerForPointer(*networkPortInfo); 
    652 //                    createdPort.init(); 
    653 //                    createdPort.connectTo(port); 
    654 //                    serverPortMap.insert(pair<FrameworkElementHandle, dataPorts::GenericPort>(message.Get<0>(), createdPort)); 
    655 //                    fINROC_LOG_PRINT(DEBUG, "Created server port ", createdPort.getWrapped().getQualifiedName()); 
    656 //                } 
    657 //                else { 
    658 //                    return true; // We could not obtain lock - try again later 
    659 //                } 
    660 //            } 
    661 //            break; 
    662 //        case UNSUBSCRIBE: 
    663 //            // TODO 
    664 //            UnsubscribeMessage message; 
    665 //            message.deserialize(stream); 
    666 //            auto it = serverPortMap.find(message.Get<0>()); 
    667 //            if (it != serverPortMap.end()) { 
    668 //                thread::Lock lock(getStructureMutex(), false); 
    669 //                if (lock.tryLock()) { 
    670 //                    it.second.getWrapped().managedDelete(); 
    671 //                    serverPortMap.erase(message.Get<0>()); 
    672 //                } 
    673 //                else { 
    674 //                    return true; // We could not obtain lock - try again later 
    675 //                } 
    676 //            } 
    677 //            else { 
    678 //                fINROC_LOG_PRINT(DEBUG_WARNING, "Port for unsubscribing not available"); 
    679 //                return false; 
    680 //            } 
    681 //            break; 
    682  
    683         case PEER_INFO: 
    684             PeerInfo info; 
     92    @Override 
     93    public void processMessage(TCP.OpCode opCode, BinaryInputStream stream, Connection connection, BufferedModelChanges modelChanges) throws Exception { 
     94        if (opCode == TCP.OpCode.PEER_INFO) { 
    68595            while (stream.readBoolean()) { 
    68696                PeerInfo peer = peerImplementation.deserializePeerInfo(stream); 
     
    68898                peerImplementation.processIncomingPeerInfo(peer); 
    68999            } 
    690             break; 
    691         default: 
    692             throw new Exception("Opcode " + opCode.toString() + " not implemented yet."); 
    693         } 
    694     } 
    695  
    696     /** 
    697      * Processes buffer containing change events regarding remote runtime. 
    698      * Must be executed from model thread. 
    699      * 
    700      * @param structureBufferToProcess Buffer containing serialized structure changes 
    701      * @param remoteTypes Info on remote types 
    702      */ 
    703     public void processStructurePacket(MemoryBuffer structureBufferToProcess, RemoteTypes remoteTypes) { 
    704         FrameworkElementInfo info = new FrameworkElementInfo(); 
    705         BinaryInputStream stream = new BinaryInputStream(structureBufferToProcess, remoteTypes); 
    706         while (stream.moreDataAvailable()) { 
    707             TCP.OpCode opcode = stream.readEnum(TCP.OpCode.class); 
    708             if (opcode == TCP.OpCode.STRUCTURE_CREATE) { 
    709                 info.deserialize(stream, peerImplementation.structureExchange); 
    710                 addRemoteStructure(info, false); 
    711             } else if (opcode == TCP.OpCode.STRUCTURE_CHANGE) { 
    712                 int handle = stream.readInt(); 
    713                 int flags = stream.readInt(); 
    714                 short strategy = stream.readShort(); 
    715                 short updateInterval = stream.readShort(); 
    716                 ProxyPort port = remotePortRegister.get(handle); 
    717                 if (port != null) { 
    718                     boolean connections = peerImplementation.structureExchange == FrameworkElementInfo.StructureExchange.FINSTRUCT; 
    719                     if (connections) { 
    720                         info.deserializeConnections(stream); 
    721                     } 
    722                     port.update(flags, strategy, updateInterval, connections ? info.copyConnections() : null, connections ? info.copyNetworkConnections() : null); 
    723  
    724                     RemotePort[] modelElements = RemotePort.get(port.getPort()); 
    725                     for (RemotePort modelElement : modelElements) { 
    726                         modelElement.setFlags(flags); 
    727                     } 
    728                 } else { 
    729                     RemoteFrameworkElement element = currentModelNode.getRemoteElement(handle); 
    730                     if (element != null) { 
    731                         element.setFlags(flags); 
    732                     } 
    733                 } 
    734             } else if (opcode == TCP.OpCode.STRUCTURE_DELETE) { 
    735                 int handle = stream.readInt(); 
    736                 ProxyPort port = remotePortRegister.get(handle); 
    737                 if (port != null) { 
    738                     RemotePort[] modelElements = RemotePort.get(port.getPort()); 
    739                     port.managedDelete(); 
    740                     for (RemotePort modelElement : modelElements) { 
    741                         peerImplementation.connectionElement.getModelHandler().removeNode(modelElement); 
    742                     } 
    743                     currentModelNode.elementLookup.remove(handle); 
    744                     uninitializedRemotePorts.remove(port); 
    745                 } else { 
    746                     RemoteFrameworkElement element = currentModelNode.getRemoteElement(handle); 
    747                     if (element != null) { 
    748                         peerImplementation.connectionElement.getModelHandler().removeNode(element); 
    749                         currentModelNode.elementLookup.remove(handle); 
    750                     } 
    751                 } 
    752             } else { 
    753                 Log.log(LogLevel.WARNING, this, "Received corrupted structure info. Skipping packet"); 
    754                 return; 
    755             } 
    756             TCPConnection.checkCommandEnd(stream); 
    757         } 
    758  
    759         // Initialize ports whose links are now complete 
    760         synchronized (getRegistryLock()) { 
    761             for (int i = 0; i < uninitializedRemotePorts.size(); i++) { 
    762                 ProxyPort port = uninitializedRemotePorts.get(i); 
    763                 RemotePort[] remotePorts = RemotePort.get(port.getPort()); 
    764                 boolean complete = true; 
    765                 for (RemotePort remotePort : remotePorts) { 
    766                     complete |= remotePort.isNodeAncestor(currentModelNode); 
    767                 } 
    768                 if (complete) { 
    769                     for (int j = 0; j < remotePorts.length; j++) { 
    770                         port.getPort().setName(createPortName(remotePorts[j]), j); 
    771                     } 
    772                     port.getPort().init(); 
    773                     uninitializedRemotePorts.remove(i); 
    774                     i--; 
    775                 } 
    776             } 
     100        } else { 
     101            super.processMessage(opCode, stream, connection, modelChanges); 
    777102        } 
    778103    } 
    779104 
    780105    @Override 
    781     public boolean pullRequest(CCPortBase origin, CCPortDataManagerTL resultBuffer, boolean intermediateAssign) { 
    782         NetPort netport = origin.asNetPort(); 
    783         if (netport != null && expressConnection != null) { 
    784             PullCall pullCall = new PullCall(netport); 
    785             pullCall.ccResultBuffer = resultBuffer; 
    786             synchronized (pullCallsAwaitingResponse) { 
    787                 pullCallsAwaitingResponse.add(pullCall); 
    788             } 
    789             synchronized (pullCall) { 
    790                 expressConnection.sendCall(pullCall); 
    791                 try { 
    792                     pullCall.wait(1000); 
    793                 } catch (InterruptedException e) {} 
    794             } 
    795             synchronized (pullCallsAwaitingResponse) { 
    796                 pullCallsAwaitingResponse.remove(pullCall); 
    797             } 
    798             if (pullCall.ccPullSuccess) { 
    799                 return true; 
    800             } 
    801         } 
    802         origin.getRaw(resultBuffer.getObject(), true); 
    803         return true; 
     106    public void removeConnection(Connection connection) { 
     107        super.removeConnection(connection); 
     108        peerInfo.lastConnection = System.currentTimeMillis(); 
    804109    } 
    805110 
    806111    @Override 
    807     public PortDataManager pullRequest(PortBase origin, byte addLocks, boolean intermediateAssign) { 
    808         NetPort netport = origin.asNetPort(); 
    809         if (netport != null && expressConnection != null) { 
    810             PullCall pullCall = new PullCall(netport); 
    811             pullCall.origin = origin; 
    812             synchronized (pullCallsAwaitingResponse) { 
    813                 pullCallsAwaitingResponse.add(pullCall); 
    814             } 
    815             synchronized (pullCall) { 
    816                 expressConnection.sendCall(pullCall); 
    817                 try { 
    818                     pullCall.wait(1000); 
    819                 } catch (InterruptedException e) {} 
    820             } 
    821             synchronized (pullCallsAwaitingResponse) { 
    822                 pullCallsAwaitingResponse.remove(pullCall); 
    823             } 
    824             if (pullCall.resultBuffer != null) { 
    825                 pullCall.resultBuffer.getCurrentRefCounter().setLocks((byte)(addLocks)); 
    826                 return pullCall.resultBuffer; 
    827             } 
    828         } 
    829         PortDataManager pd = origin.lockCurrentValueForRead(); 
    830         pd.getCurrentRefCounter().addLocks((byte)(addLocks - 1)); // we already have one lock 
    831         return pd; 
    832     } 
    833  
    834     /** 
    835      * Removes connection for this remote part 
    836      * 
    837      * @param connection Connection to remove 
    838      */ 
    839     void removeConnection(TCPConnection connection) { 
    840         if (connection == managementConnection) { 
    841             managementConnection = null; 
    842             deleteAllChildren(); 
    843             globalLinks = null; 
    844             serverPorts = null; 
    845             uninitializedRemotePorts.clear(); 
    846             pullCallsAwaitingResponse.clear(); 
    847             if (currentModelNode != null) { 
    848                 peerImplementation.connectionElement.getModelHandler().removeNode(currentModelNode); 
    849             } 
    850             currentModelNode = null; 
    851         } 
    852         if (connection == expressConnection) { 
    853             expressConnection = null; 
    854         } 
    855         if (connection == bulkConnection) { 
    856             bulkConnection = null; 
    857         } 
    858         if (peerInfo.connected) { 
    859             peerInfo.lastConnection = System.currentTimeMillis(); 
    860         } 
    861         peerInfo.connected = false; 
    862     } 
    863  
    864     /** 
    865      * @param sendStructureInfo Structure information to send to remote part 
    866      */ 
    867     void setDesiredStructureInfo(FrameworkElementInfo.StructureExchange sendStructureInfo) { 
    868         if (sendStructureInfo == FrameworkElementInfo.StructureExchange.NONE) { 
    869             return; 
    870         } 
    871         if (this.sendStructureInfo != FrameworkElementInfo.StructureExchange.NONE && this.sendStructureInfo != sendStructureInfo) { 
    872             Log.log(LogLevel.WARNING, this, "Desired structure info already set to " + this.sendStructureInfo.toString() + ". This is likely to cause trouble."); 
    873         } 
    874         this.sendStructureInfo = sendStructureInfo; 
    875     } 
    876  
    877     /** 
    878      * Local port that acts as proxy for ports on remote machines 
    879      */ 
    880     public class ProxyPort extends TCPPort { 
    881  
    882         /** Has port been found again after reconnect? */ 
    883         private boolean refound = true; 
    884  
    885         /** >= 0 when port has subscribed to server; value of current subscription */ 
    886         private short subscriptionStrategy = -1; 
    887  
    888         /** true, if current subscription includes reverse push strategy */ 
    889         private boolean subscriptionRevPush = false; 
    890  
    891         /** Update time of current subscription */ 
    892         private short subscriptionUpdateTime = -1; 
    893  
    894         /** Handles (remote) of port's outgoing connections */ 
    895         protected ArrayList<FrameworkElementInfo.ConnectionInfo> connections = new ArrayList<FrameworkElementInfo.ConnectionInfo>(); 
    896  
    897         /** Info on port's outgoing network connections */ 
    898         protected ArrayList<FrameworkElementInfo.NetworkConnection> networkConnections = new ArrayList<FrameworkElementInfo.NetworkConnection>(); 
    899  
    900  
    901         /** 
    902          * @param portInfo Port information 
    903          */ 
    904         public ProxyPort(FrameworkElementInfo portInfo) { 
    905             super(createPCI(portInfo), null /*(portInfo.getFlags() & Flag.EXPRESS_PORT) > 0 ? expressConnection : bulkConnection // bulkConnection might be null*/); 
    906             remoteHandle = portInfo.getHandle(); 
    907             remotePortRegister.put(remoteHandle, this); 
    908  
    909             super.updateFlags(portInfo.getFlags()); 
    910             getPort().setMinNetUpdateInterval(portInfo.getMinNetUpdateInterval()); 
    911             updateIntervalPartner = portInfo.getMinNetUpdateInterval(); // TODO redundant? 
    912             propagateStrategyFromTheNet(portInfo.getStrategy()); 
    913             connections = portInfo.copyConnections(); 
    914             networkConnections = portInfo.copyNetworkConnections(); 
    915  
    916             Log.log(LogLevel.DEBUG_VERBOSE_2, this, "Updating port info: " + portInfo.toString()); 
    917             for (int i = 1, n = portInfo.getLinkCount(); i < n; i++) { 
    918                 FrameworkElement parent = portInfo.getLink(i).unique ? getGlobalLinkElement() : (FrameworkElement)RemotePart.this; 
    919                 getPort().link(parent, portInfo.getLink(i).name); 
    920             } 
    921             FrameworkElement parent = portInfo.getLink(0).unique ? getGlobalLinkElement() : (FrameworkElement)RemotePart.this; 
    922             getPort().setName(portInfo.getLink(0).name); 
    923             parent.addChild(getPort()); 
    924             FrameworkElementTags.addTags(getPort(), portInfo.getTags()); 
    925  
    926             if (getPort() instanceof CCPortBase) { 
    927                 ((CCPortBase)getPort()).setPullRequestHandler(RemotePart.this); 
    928             } else if (getPort() instanceof PortBase) { 
    929                 ((PortBase)getPort()).setPullRequestHandler(RemotePart.this); 
    930             } 
    931         } 
    932  
    933         public void update(int flags, short strategy, short minNetUpdateInterval, ArrayList<FrameworkElementInfo.ConnectionInfo> newConnections, 
    934                            ArrayList<FrameworkElementInfo.NetworkConnection> newNetworkConnections) { 
    935             updateFlags(flags); 
    936             getPort().setMinNetUpdateInterval(minNetUpdateInterval); 
    937             updateIntervalPartner = minNetUpdateInterval; // TODO redundant? 
    938             propagateStrategyFromTheNet(strategy); 
    939             if (newConnections != null) { 
    940                 connections = newConnections; 
    941             } else if (connections.size() > 0) { 
    942                 connections = new ArrayList<FrameworkElementInfo.ConnectionInfo>(); // create new list for thread-safety reasons 
    943             } 
    944             if (newNetworkConnections != null) { 
    945                 networkConnections = newNetworkConnections; 
    946             } else if (connections.size() > 0) { 
    947                 networkConnections = new ArrayList<FrameworkElementInfo.NetworkConnection>(); // create new list for thread-safety reasons 
    948             } 
    949         } 
    950  
    951 //        /** 
    952 //         * Is port the one that is described by this information? 
    953 //         * 
    954 //         * @param info Port information 
    955 //         * @return Answer 
    956 //         */ 
    957 //        public boolean matches(FrameworkElementInfo info) { 
    958 //            synchronized (getPort()) { 
    959 //                if (remoteHandle != info.getHandle() || info.getLinkCount() != getPort().getLinkCount()) { 
    960 //                    return false; 
    961 //                } 
    962 //                if ((getPort().getAllFlags() & Flag.CONSTANT_FLAGS) != (info.getFlags() & Flag.CONSTANT_FLAGS)) { 
    963 //                    return false; 
    964 //                } 
    965 //                for (int i = 0; i < info.getLinkCount(); i++) { 
    966 //                    if (peerImplementation.structureExchange == FrameworkElementInfo.StructureExchange.SHARED_PORTS) { 
    967 //                        getPort().getQualifiedLink(tmpMatchBuffer, i); 
    968 //                    } else { 
    969 //                        tmpMatchBuffer.delete(0, tmpMatchBuffer.length()); 
    970 //                        tmpMatchBuffer.append(getPort().getLink(i).getName()); 
    971 //                    } 
    972 //                    if (!tmpMatchBuffer.equals(info.getLink(i).name)) { 
    973 //                        return false; 
    974 //                    } 
    975 //                    // parents are negligible if everything else, matches 
    976 //                } 
    977 //                return true; 
    978 //            } 
    979 //        } 
    980  
    981 //        public void reset() { 
    982 //            connection = null; // set connection to null 
    983 //            monitored = false; // reset monitored flag 
    984 //            refound = false; // reset refound flag 
    985 //            propagateStrategyFromTheNet((short)0); 
    986 //            subscriptionRevPush = false; 
    987 //            subscriptionUpdateTime = -1; 
    988 //            subscriptionStrategy = -1; 
    989 //        } 
    990  
    991 //        /** 
    992 //         * Update port properties/information from received port information 
    993 //         * 
    994 //         * @param portInfo Port info 
    995 //         */ 
    996 //        private void updateFromPortInfo(FrameworkElementInfo portInfo, TCP.OpCode opCode) { 
    997 //            synchronized (getPort().getRegistryLock()) { 
    998 //                updateFlags(portInfo.getFlags()); 
    999 //                getPort().setMinNetUpdateInterval(portInfo.getMinNetUpdateInterval()); 
    1000 //                updateIntervalPartner = portInfo.getMinNetUpdateInterval(); // TODO redundant? 
    1001 //                propagateStrategyFromTheNet(portInfo.getStrategy()); 
    1002 //                portInfo.getConnections(connections); 
    1003 // 
    1004 //                log(LogLevel.LL_DEBUG_VERBOSE_2, this, "Updating port info: " + portInfo.toString()); 
    1005 //                if (opCode == TCP.OpCode.STRUCTURE_CREATE) { 
    1006 //                    assert(!getPort().isReady()); 
    1007 //                    for (int i = 1, n = portInfo.getLinkCount(); i < n; i++) { 
    1008 //                        FrameworkElement parent = portInfo.getLink(i).unique ? getGlobalLinkElement() : (FrameworkElement)RemotePart.this; 
    1009 //                        getPort().link(parent, portInfo.getLink(i).name); 
    1010 //                    } 
    1011 //                    FrameworkElement parent = portInfo.getLink(0).unique ? getGlobalLinkElement() : (FrameworkElement)RemotePart.this; 
    1012 //                    getPort().setName(portInfo.getLink(0).name); 
    1013 //                    parent.addChild(getPort()); 
    1014 //                } 
    1015 //                FrameworkElementTags.addTags(getPort(), portInfo.getTags()); 
    1016 // 
    1017 //                checkSubscription(); 
    1018 //            } 
    1019 //        } 
    1020  
    1021         @Override 
    1022         protected void prepareDelete() { 
    1023             remotePortRegister.remove(remoteHandle); 
    1024             getPort().disconnectAll(); 
    1025             checkSubscription(); 
    1026             super.prepareDelete(); 
    1027         } 
    1028  
    1029         @Override 
    1030         protected void connectionRemoved() { 
    1031             checkSubscription(); 
    1032         } 
    1033  
    1034         @Override 
    1035         protected void connectionAdded() { 
    1036             checkSubscription(); 
    1037         } 
    1038  
    1039         @Override 
    1040         protected void propagateStrategyOverTheNet() { 
    1041             checkSubscription(); 
    1042         } 
    1043  
    1044         @Override 
    1045         protected void checkSubscription() { 
    1046             if (FinrocTypeInfo.isMethodType(getPort().getDataType(), true)) { 
    1047                 return; 
    1048             } 
    1049  
    1050             synchronized (getPort().getRegistryLock()) { 
    1051                 AbstractPort p = getPort(); 
    1052                 boolean revPush = p.isInputPort() && (p.isConnectedToReversePushSources() || p.getOutgoingConnectionCount() > 0); 
    1053                 short time = getUpdateIntervalForNet(); 
    1054                 short strategy = p.isInputPort() ? 0 : p.getStrategy(); 
    1055                 if (!p.isConnected()) { 
    1056                     strategy = -1; 
    1057                 } 
    1058  
    1059                 TCPConnection c = connection; 
    1060  
    1061                 if (c == null) { 
    1062                     subscriptionStrategy = -1; 
    1063                     subscriptionRevPush = false; 
    1064                     subscriptionUpdateTime = -1; 
    1065                 } else if (strategy == -1 && subscriptionStrategy > -1) { // disconnect 
    1066                     //System.out.println("Unsubscribing " + (((long)remoteHandle) + (1L << 32L)) + " " + getPort().getQualifiedName()); 
    1067                     c.unsubscribe(remoteHandle); 
    1068                     subscriptionStrategy = -1; 
    1069                     subscriptionRevPush = false; 
    1070                     subscriptionUpdateTime = -1; 
    1071                 } else if (strategy == -1) { 
    1072                     // still disconnected 
    1073                 } else if (strategy != subscriptionStrategy || time != subscriptionUpdateTime || revPush != subscriptionRevPush) { 
    1074                     boolean requestCompressedData = hasCompressionSupport && getNetworkEncoding() == Serialization.DataEncoding.BINARY && (!peerInfo.uuid.hostName.equals(peerImplementation.thisPeer.uuid)) && 
    1075                                                     isStdType() && getPort().getDataType().getJavaClass() != null && Compressible.class.isAssignableFrom(getPort().getDataType().getJavaClass()); 
    1076  
    1077                     c.subscribe(remoteHandle, strategy, revPush, time, p.getHandle(), requestCompressedData ? Serialization.DataEncoding.BINARY_COMPRESSED : getNetworkEncoding()); 
    1078                     //c.subscribe(remoteHandle, strategy, revPush, time, p.getHandle(), getNetworkEncoding()); 
    1079                     subscriptionStrategy = strategy; 
    1080                     subscriptionRevPush = revPush; 
    1081                     subscriptionUpdateTime = time; 
    1082                 } 
    1083                 setMonitored(publishPortDataOverTheNet() && getPort().isConnected()); 
    1084             } 
    1085         } 
    1086  
    1087         @Override 
    1088         public int getRemoteEdgeDestinations(List<AbstractPort> result) { 
    1089             result.clear(); 
    1090             for (int i = 0; i < connections.size(); i++) { 
    1091                 ProxyPort pp = remotePortRegister.get(connections.get(i).handle); 
    1092                 if (pp != null) { 
    1093                     result.add(pp.getPort()); 
    1094                 } 
    1095             } 
    1096             int numberOfReverseConnections = 0; 
    1097             if (getExtraEdgeProvider() != null) { 
    1098                 numberOfReverseConnections = getExtraEdgeProvider().getRemoteEdgeDestinations(result); 
    1099             } 
    1100             for (FrameworkElementInfo.NetworkConnection connection : networkConnections) { 
    1101                 AbstractPort port = connection.getDestinationPort(currentModelNode); 
    1102                 if (port != null) { 
    1103                     if (connection.isDestinationSource()) { 
    1104                         numberOfReverseConnections++; 
    1105                         result.add(port); 
    1106                     } else { 
    1107                         result.add(result.size() - numberOfReverseConnections, port); 
    1108                     } 
    1109                 } 
    1110             } 
    1111             return result.size() - numberOfReverseConnections; 
    1112         } 
    1113  
    1114         @Override 
    1115         protected void postChildInit() { 
    1116             this.connection = (getPort().getFlag(FrameworkElementFlags.EXPRESS_PORT) || 
    1117                                (getPort().getDataType().getJavaClass() != null && CCType.class.isAssignableFrom(getPort().getDataType().getJavaClass()))) ? expressConnection : bulkConnection; 
    1118             super.postChildInit(); 
    1119         } 
    1120     } 
    1121  
    1122     /** 
    1123      * (Belongs to ProxyPort) 
    1124      * 
    1125      * Create Port Creation info from PortInfo class. 
    1126      * Except from Shared flag port will be identical to original port. 
    1127      * 
    1128      * @param portInfo Port Information 
    1129      * @return Port Creation info 
    1130      */ 
    1131     private static PortCreationInfo createPCI(FrameworkElementInfo portInfo) { 
    1132         PortCreationInfo pci = new PortCreationInfo(portInfo.getFlags()); 
    1133         pci.flags = portInfo.getFlags(); 
    1134  
    1135         // set queue size 
    1136         pci.maxQueueSize = portInfo.getStrategy(); 
    1137  
    1138         pci.dataType = portInfo.getDataType(); 
    1139         pci.lockOrder = LockOrderLevels.REMOTE_PORT; 
    1140  
    1141         return pci; 
    1142     } 
    1143  
    1144     /** 
    1145      * Pull call storage and management 
    1146      */ 
    1147     class PullCall extends SerializedTCPCommand { 
    1148  
    1149         PullCall(NetPort netport) { 
    1150             super(TCP.OpCode.PULLCALL, 16); 
    1151             timeSent = System.currentTimeMillis(); 
    1152             callId = nextCallId.incrementAndGet(); 
    1153             encoding = netport.getNetworkEncoding(); 
    1154             getWriteStream().writeInt(netport.getRemoteHandle()); 
    1155             getWriteStream().writeLong(callId); 
    1156             getWriteStream().writeEnum(encoding); 
    1157         } 
    1158  
    1159         /** Time when call was created/sent */ 
    1160         final long timeSent; 
    1161  
    1162         /** Call id of pull call */ 
    1163         final long callId; 
    1164  
    1165         /** Port call originates from - in case of a standard port */ 
    1166         PortBase origin; 
    1167  
    1168         /** Buffer with result */ 
    1169         PortDataManager resultBuffer; 
    1170  
    1171         /** Result buffer for CC port */ 
    1172         CCPortDataManagerTL ccResultBuffer; 
    1173  
    1174         /** Was CC Pull successful? */ 
    1175         boolean ccPullSuccess; 
    1176  
    1177         /** Data encoding to use */ 
    1178         Serialization.DataEncoding encoding; 
     112    public ModelHandler getModelHandler() { 
     113        return peerImplementation.connectionElement.getModelHandler(); 
    1179114    } 
    1180115} 
  • internal/TCP.java

    r95 r114  
    2929import org.finroc.core.RuntimeEnvironment; 
    3030import org.finroc.core.RuntimeSettings; 
    31 import org.finroc.core.datatype.FrameworkElementInfo; 
     31import org.finroc.core.net.generic_protocol.Definitions; 
    3232import org.finroc.core.parameter.ConstructorParameters; 
    3333import org.finroc.core.parameter.StaticParameterList; 
     
    4545 * Plugin for P2P TCP connections 
    4646 */ 
    47 public class TCP implements Plugin { 
     47public class TCP extends Definitions implements Plugin { 
    4848 
    4949    /** Singleton instance of TCP plugin */ 
     
    5858 
    5959    /** 
    60      * Protocol OpCodes 
     60     * Flags for connection properties 
    6161     */ 
    62     public enum OpCode { 
    63  
    64         // Opcodes for management connection 
    65         SUBSCRIBE,         // Subscribe to data port 
    66         UNSUBSCRIBE,       // Unsubscribe from data port 
    67         PULLCALL,          // Pull call 
    68         PULLCALL_RETURN,   // Returning pull call 
    69         RPC_CALL,          // RPC call 
    70         TYPE_UPDATE,       // Update on remote type info (typically desired update time) 
    71         STRUCTURE_CREATE,  // Update on remote framework elements: Element created 
    72         STRUCTURE_CHANGE,  // Update on remote framework elements: Port changed 
    73         STRUCTURE_DELETE,  // Update on remote framework elements: Element deleted 
    74         PEER_INFO,         // Information about other peers 
    75  
    76         // Change event opcodes (from subscription - or for plain setting of port) 
    77         PORT_VALUE_CHANGE,                         // normal variant 
    78         SMALL_PORT_VALUE_CHANGE,                   // variant with max. 256 byte message length (3 bytes smaller than normal variant) 
    79         SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP, // variant with max. 256 byte message length and no timestamp (11 bytes smaller than normal variant) 
    80  
    81         // Used for messages without opcode 
    82         OTHER 
    83     } 
    84  
    85     public enum MessageSize { 
    86         FIXED,                   // fixed size message 
    87         VARIABLE_UP_TO_255_BYTE, // variable message size up to 255 bytes 
    88         VARIABLE_UP_TO_4GB       // variable message size up to 4GB 
    89     }; 
     62    public static final int 
     63    PRIMARY_CONNECTION = 0x1,  //<! This connection is used to transfer management data (e.g. available ports, peer info, subscriptions, rpc calls) 
     64    EXPRESS_DATA    = 0x2,  //<! This connection transfers "express data" (port values of express ports) - candidate for UDP in the future 
     65    BULK_DATA       = 0x4,  //<! This connection transfers "bulk data" (port values of bulk ports) - candidate for UDP in the future 
     66    JAVA_PEER       = 0x8,  //<! Sent by Java Peers on connection initialization 
     67    NO_DEBUG        = 0x10; //<! No debug information in protocol 
    9068 
    9169    /** Mode of peer */ 
     
    9573        FULL          // Peer is client and server 
    9674    } 
    97  
    98     /** 
    99      * Flags for connection properties 
    100      */ 
    101     public static final int 
    102     MANAGEMENT_DATA = 0x1, //<! This connection is used to transfer management data (e.g. available ports, peer info, subscriptions, rpc calls) 
    103     EXPRESS_DATA    = 0x2, //<! This connection transfers "express data" (port values of express ports) - candidate for UDP in the future 
    104     BULK_DATA       = 0x4; //<! This connection transfers "bulk data" (port values of bulk ports) - candidate for UDP in the future 
    105  
    106     /** Message size encodings of different kinds of op codes */ 
    107     public static final MessageSize[] MESSAGE_SIZES = new MessageSize[] { 
    108         MessageSize.FIXED,                   // SUBSCRIBE 
    109         MessageSize.FIXED,                   // UNSUBSCRIBE 
    110         MessageSize.FIXED,                   // PULLCALL 
    111         MessageSize.VARIABLE_UP_TO_4GB,      // PULLCALL_RETURN 
    112         MessageSize.VARIABLE_UP_TO_4GB,      // RPC_CALL 
    113         MessageSize.VARIABLE_UP_TO_4GB,      // UPDATE_TIME 
    114         MessageSize.VARIABLE_UP_TO_4GB,      // STRUCTURE_CREATE 
    115         MessageSize.VARIABLE_UP_TO_4GB,      // STRUCTURE_CHANGE 
    116         MessageSize.FIXED,                   // STRUCTURE_DELETE 
    117         MessageSize.VARIABLE_UP_TO_4GB,      // PEER_INFO 
    118         MessageSize.VARIABLE_UP_TO_4GB,      // PORT_VALUE_CHANGE 
    119         MessageSize.VARIABLE_UP_TO_255_BYTE, // SMALL_PORT_VALUE_CHANGE 
    120         MessageSize.VARIABLE_UP_TO_255_BYTE, // SMALL_PORT_VALUE_CHANGE_WITHOUT_TIMESTAMP 
    121         MessageSize.VARIABLE_UP_TO_4GB,      // OTHER 
    122     }; 
    12375 
    12476    /** Return Status */ 
     
    13183    public static final String GREET_MESSAGE = "Greetings! I am a Finroc TCP peer."; 
    13284 
    133     /** Finroc TCP protocol version */ 
    134     public static final short PROTOCOL_VERSION = 1; 
    135  
    136     /** Inserted at the end of messages to debug TCP stream */ 
    137     public static final byte DEBUG_TCP_NUMBER = (byte)0xCD; 
    138  
    13985    /** Standard TCP connection creator */ 
    140     public static final CreateAction creator1 = new CreateAction(FrameworkElementInfo.StructureExchange.COMPLETE_STRUCTURE, "TCP", 0); 
     86    public static final CreateAction creator1 = new CreateAction(Definitions.StructureExchange.COMPLETE_STRUCTURE, "TCP", 0); 
    14187 
    14288    /** Alternative TCP connection creator */ 
    143     public static final CreateAction creator2 = new CreateAction(FrameworkElementInfo.StructureExchange.SHARED_PORTS, TCP_PORTS_ONLY_NAME, 0); 
     89    public static final CreateAction creator2 = new CreateAction(Definitions.StructureExchange.SHARED_PORTS, TCP_PORTS_ONLY_NAME, 0); 
    14490 
    14591    /** Complete TCP connection creator */ 
    146     public static final CreateAction creator3 = new CreateAction(FrameworkElementInfo.StructureExchange.FINSTRUCT, "TCP finstruct", CreateExternalConnectionAction.REMOTE_EDGE_INFO); 
     92    public static final CreateAction creator3 = new CreateAction(Definitions.StructureExchange.FINSTRUCT, "TCP finstruct", CreateExternalConnectionAction.REMOTE_EDGE_INFO); 
    14793 
    14894 
     
    198144 
    199145        /** Desired structure exchange */ 
    200         private final FrameworkElementInfo.StructureExchange structureExchange; 
     146        private final Definitions.StructureExchange structureExchange; 
    201147 
    202148        /** Name of connection type */ 
     
    209155        private final String group; 
    210156 
    211         public CreateAction(FrameworkElementInfo.StructureExchange structureExchange, String name, int flags) { 
     157        public CreateAction(Definitions.StructureExchange structureExchange, String name, int flags) { 
    212158            this.structureExchange = structureExchange; 
    213159            this.name = name; 
  • internal/TCPConnection.java

    r110 r114  
    3535import org.rrlib.finroc_core_utils.jc.AtomicDoubleInt; 
    3636import org.rrlib.finroc_core_utils.jc.HasDestructor; 
    37 import org.rrlib.finroc_core_utils.jc.MutexLockOrder; 
    38 import org.rrlib.finroc_core_utils.jc.Time; 
    39 import org.rrlib.finroc_core_utils.jc.container.SafeConcurrentlyIterableList; 
    4037import org.rrlib.finroc_core_utils.jc.container.WonderQueue; 
    4138import org.rrlib.logging.Log; 
     
    4542import org.rrlib.serialization.InputStreamSource; 
    4643import org.rrlib.serialization.MemoryBuffer; 
    47 import org.rrlib.serialization.Serialization; 
     44import org.rrlib.serialization.SerializationInfo; 
    4845import org.rrlib.serialization.rtti.DataTypeBase; 
    4946 
    50 import org.finroc.core.LockOrderLevels; 
    51 import org.finroc.core.RuntimeEnvironment; 
    52 import org.finroc.core.RuntimeSettings; 
    5347import org.finroc.core.datatype.CoreString; 
    54 import org.finroc.core.datatype.FrameworkElementInfo; 
    55 import org.finroc.core.parameter.ParameterNumeric; 
     48import org.finroc.core.net.generic_protocol.Connection; 
     49import org.finroc.core.net.generic_protocol.Definitions; 
     50import org.finroc.core.net.generic_protocol.RemoteProxyPort; 
     51import org.finroc.core.net.generic_protocol.SerializedMessage; 
    5652import org.finroc.core.port.AbstractPort; 
    5753import org.finroc.core.port.ThreadLocalCache; 
    58 import org.finroc.core.port.net.UpdateTimeChangeListener; 
    5954import org.finroc.core.port.rpc.FutureStatus; 
    6055import org.finroc.core.port.rpc.internal.AbstractCall; 
    61 import org.finroc.core.port.rpc.internal.ResponseSender; 
     56import org.finroc.core.remote.BufferedModelChanges; 
     57import org.finroc.core.remote.ModelHandler; 
    6258import org.finroc.core.remote.ModelNode; 
    63 import org.finroc.core.remote.RemoteTypes; 
     59import org.finroc.core.remote.RemoteType; 
    6460import org.finroc.core.thread.CoreLoopThreadBase; 
    6561import org.finroc.plugins.tcp.TCPSettings; 
    66 import org.finroc.plugins.tcp.internal.TCP.OpCode; 
    6762 
    6863/** 
     
    7368 * (writer and listener members need to be initialized by subclass) 
    7469 */ 
    75 class TCPConnection implements UpdateTimeChangeListener, ResponseSender { 
     70public class TCPConnection extends Connection { 
    7671 
    7772    /** Network Socket used for accessing remote Server */ 
    7873    private final Socket socket; 
    7974 
    80     /** default connection times of connection partner */ 
    81     final RemoteTypes remoteTypes = new RemoteTypes(); 
    82  
    8375    /** Socket's output stream */ 
    8476    private final OutputStream socketOutputStream; 
    8577 
    86     /** Buffer for writing data to stream */ 
    87     private final MemoryBuffer writeBuffer = new MemoryBuffer(MemoryBuffer.DEFAULT_SIZE, MemoryBuffer.DEFAULT_RESIZE_FACTOR, false); 
    88  
    89     /** Output Stream for sending data to remote Server */ 
    90     private final BinaryOutputStream writeBufferStream = new BinaryOutputStream(writeBuffer, remoteTypes); 
    91  
    92     /** Input Stream for receiving data ro remote Server */ 
    93     private final BinaryInputStream readBufferStream; 
    94  
    95     /** Reference to remote part this connection belongs to */ 
    96     private final RemotePart remotePart; 
    97  
    9878    /** Writer Thread */ 
    9979    private Writer writer; 
     
    10282    private Reader reader; 
    10383 
    104     /** List with calls that wait for a response */ 
    105     final ArrayList<CallAndTimeout> callsAwaitingResponse = new ArrayList<CallAndTimeout>(); 
    106  
    107     /** References to Connection parameters */ 
    108     private ParameterNumeric<Integer> minUpdateInterval; 
    109     private ParameterNumeric<Integer> maxNotAcknowledgedPackets; 
    110  
    111     /** Index of last acknowledged sent packet */ 
    112     private volatile int lastAcknowledgedPacket = 0; 
    113  
    114     /** Index of last acknowledgement request that was received */ 
    115     private volatile int lastAckRequestIndex = 0; 
    116  
    117     /** Timestamp of when packet n was sent (Index is n % MAX_NOT_ACKNOWLEDGED_PACKETS => efficient and safe implementation (ring queue)) */ 
    118     private final long[] sentPacketTime = new long[TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS + 1]; 
    119  
    120     /** Ping time for last packages (Index is n % AVG_PING_PACKETS => efficient and safe implementation (ring queue)) */ 
    121     private final int[] pingTimes = new int[TCPSettings.AVG_PING_PACKETS + 1]; 
    122  
    123     /** Ping time statistics */ 
    124     private volatile int avgPingTime, maxPingTime; 
    125  
    126     /** Signal for disconnecting */ 
    127     private volatile boolean disconnectSignal = false; 
    128  
    129     /** Connection type - BULK or EXPRESS */ 
    130     protected final boolean bulk; 
    131  
    132     /** Ports that are monitored for changes by this connection and should be checked for modifications */ 
    133     protected SafeConcurrentlyIterableList<TCPPort> monitoredPorts = new SafeConcurrentlyIterableList<TCPPort>(50, 4); 
    134  
    13584    /** TCPPeer that this connection belongs to (null if it does not belong to a peer) */ 
    13685    protected final TCPPeer peer; 
     
    13988    protected int lastPeerInfoSentRevision = -1; 
    14089 
    141     /** Rx related: last time RX was retrieved */ 
    142     protected long lastRxTimestamp = 0; 
    143  
    144     /** Rx related: last time RX was retrieved: how much have we received in total? */ 
    145     protected long lastRxPosition = 0; 
    146  
    147     /** Needs to be locked after framework elements, but before runtime registry */ 
    148     public final MutexLockOrder objMutex = new MutexLockOrder(LockOrderLevels.REMOTE + 1); 
    149  
    150     /** Log description for connection */ 
    151     protected String description; 
    152  
    153     /** Temporary buffer with port information */ 
    154     private final FrameworkElementInfo tempFrameworkElementInfo = new FrameworkElementInfo(); 
    155  
     90    /** Whether connection has been disconnected - or is disconnected */ 
     91    private volatile boolean disconnected = false; 
    15692 
    15793    /** 
     
    169105        this.socket = socket; 
    170106        this.socketOutputStream = socket.getOutputStream(); 
    171         this.readBufferStream = new BinaryInputStream(new InputStreamSource(socket.getInputStream()), remoteTypes); 
     107        this.readBufferStream = new BinaryInputStream(new InputStreamSource(socket.getInputStream()), INITIAL_SERIALIZATION_INFO); 
    172108 
    173109        // Initialize connection 
    174110        // Write... 
    175111        writeBufferStream.writeString(TCP.GREET_MESSAGE); 
    176         writeBufferStream.writeShort(TCP.PROTOCOL_VERSION); 
     112        writeBufferStream.writeShort(Definitions.PROTOCOL_VERSION_MAJOR); 
    177113        writeBufferStream.writeSkipOffsetPlaceholder(); 
    178114        // ConnectionInitMessage 
     
    181117        writeBufferStream.writeString(peer.thisPeer.name); 
    182118        writeBufferStream.writeEnum(peer.structureExchange); 
    183         writeBufferStream.writeInt((connectionFlags & TCP.BULK_DATA) != 0 ? TCP.BULK_DATA : (TCP.EXPRESS_DATA | TCP.MANAGEMENT_DATA)); 
     119        writeBufferStream.writeInt(((connectionFlags & TCP.PRIMARY_CONNECTION) != 0 ? (TCP.BULK_DATA | TCP.PRIMARY_CONNECTION) : TCP.EXPRESS_DATA) | TCP.JAVA_PEER | (Definitions.PROTOCOL_VERSION_MINOR << 16)); 
    184120        TCP.serializeInetAddress(writeBufferStream, socket.getInetAddress()); 
    185         if (TCPSettings.DEBUG_TCP) { 
    186             writeBufferStream.writeByte(TCP.DEBUG_TCP_NUMBER); 
    187         } 
     121        writeBufferStream.writeByte(Definitions.DEBUG_TCP_NUMBER); 
    188122        writeBufferStream.skipTargetHere(); 
    189123        writeBufferToNetwork(); 
     
    197131            throw new ConnectException("Partner does not speak Finroc protocol"); 
    198132        } 
    199         if (readBufferStream.readShort() != TCP.PROTOCOL_VERSION) { 
     133        if (readBufferStream.readShort() != Definitions.PROTOCOL_VERSION_MAJOR) { 
    200134            Log.log(LogLevel.WARNING, this, "Connection partner has wrong protocol version"); 
    201135            throw new ConnectException("Partner has wrong protocol version"); 
     
    206140        TCP.PeerType peerType = readBufferStream.readEnum(TCP.PeerType.class); 
    207141        String peerName = readBufferStream.readString(); 
    208         FrameworkElementInfo.StructureExchange structureExchange = readBufferStream.readEnum(FrameworkElementInfo.StructureExchange.class); // TODO: send structure 
    209         connectionFlags |= readBufferStream.readInt(); 
    210         this.bulk = (connectionFlags & TCP.BULK_DATA) != 0; 
     142        Definitions.StructureExchange structureExchange = readBufferStream.readEnum(Definitions.StructureExchange.class); // TODO: send structure 
     143        int flags = readBufferStream.readInt(); 
     144        int partnerFlags = flags & 0xFF; 
     145        int partnerSerializationRevision = flags >> 16; 
     146        boolean legacyPartnerRuntime = partnerSerializationRevision == 0; 
     147        int partnerHandleStampWidth = legacyPartnerRuntime ? 12 : ((flags >> 8) & 0xFF); 
     148        connectionFlags |= partnerFlags; 
     149        this.primary = (connectionFlags & TCP.PRIMARY_CONNECTION) != 0; 
    211150        InetAddress myAddress = TCP.deserializeInetAddress(readBufferStream); 
    212         checkCommandEnd(readBufferStream); 
     151        boolean debugTcp = legacyPartnerRuntime || (!((connectionFlags & TCP.NO_DEBUG) != 0 || (flags & TCP.NO_DEBUG) != 0)); 
     152        checkCommandEnd(readBufferStream);  // requires serialization info to bet set up 
    213153 
    214154        // Adjust peer management information 
    215155        synchronized (peer.connectTo) { 
    216156            peer.addOwnAddress(myAddress); 
    217             remotePart = peer.getRemotePart(uuid, peerType, peerName, socket.getInetAddress(), neverForget); 
    218             remotePart.peerInfo.connecting = true; 
    219             if (!remotePart.addConnection(this)) { 
     157            remoteRuntime = peer.getRemotePart(uuid, peerType, peerName, socket.getInetAddress(), neverForget, partnerHandleStampWidth); 
     158            ((RemotePart)remoteRuntime).peerInfo.connecting = true; 
     159            if (!remoteRuntime.addConnection(this)) { 
    220160                throw new ConnectException("Connection already established. Closing."); 
    221161            } 
    222             if (peer.thisPeer.peerType != TCP.PeerType.CLIENT_ONLY) { 
    223                 remotePart.setDesiredStructureInfo(structureExchange); 
    224             } 
     162        } 
     163 
     164        // Setup serialization info 
     165        int otherSerializationFlags = structureExchange.ordinal() | (debugTcp ? Definitions.INFO_FLAG_DEBUG_PROTOCOL : 0); 
     166        int otherDeserializationFlags = peer.structureExchange.ordinal() | Definitions.INFO_FLAG_JAVA_CLIENT | (debugTcp ? Definitions.INFO_FLAG_DEBUG_PROTOCOL : 0); 
     167        boolean javaPartner = (partnerFlags & TCP.JAVA_PEER) != 0; 
     168        boolean primaryConnection = (partnerFlags & TCP.PRIMARY_CONNECTION) != 0; 
     169        boolean expressOnlyConnection = (partnerFlags & TCP.EXPRESS_DATA) != 0 && (!primaryConnection) || (connectionFlags & TCP.EXPRESS_DATA) != 0; 
     170        if (legacyPartnerRuntime) { 
     171            SerializationInfo legacySerialization = new SerializationInfo(0, INITIAL_SERIALIZATION_INFO.getRegisterEntryEncodings(), otherSerializationFlags); 
     172            writeBufferStream.setSerializationTarget(legacySerialization); 
     173            SerializationInfo legacyDeserialization = new SerializationInfo(0, INITIAL_SERIALIZATION_INFO.getRegisterEntryEncodings(), otherDeserializationFlags); 
     174            readBufferStream.setSerializationSource(legacyDeserialization); 
     175        } else if (expressOnlyConnection) { 
     176            writeBufferStream.setSharedSerializationInfo(remoteRuntime.getPrimaryConnection().getWriteBufferStream()); 
     177            readBufferStream.setSharedSerializationInfo(remoteRuntime.getPrimaryConnection().getReadBufferStream()); 
     178        } else { 
     179            int revision = Math.min(Definitions.PROTOCOL_VERSION_MINOR, partnerSerializationRevision); 
     180            int writeEncoding = INITIAL_SERIALIZATION_INFO.getRegisterEntryEncodings(); 
     181            int javaEncoding = SerializationInfo.setRegisterEntryEncoding(SerializationInfo.setDefaultRegisterEntryEncoding(SerializationInfo.RegisterEntryEncoding.PUBLISH_REGISTER_ON_CHANGE, Definitions.RegisterUIDs.values().length), Definitions.RegisterUIDs.CREATE_ACTION.ordinal(), SerializationInfo.RegisterEntryEncoding.PUBLISH_REGISTER_ON_DEMAND); 
     182            if (javaPartner) { 
     183                writeEncoding = javaEncoding; 
     184                otherSerializationFlags |= Definitions.INFO_FLAG_JAVA_CLIENT; 
     185            } 
     186            writeBufferStream.setSerializationTarget(new SerializationInfo(revision, writeEncoding, otherSerializationFlags)); 
     187            readBufferStream.setSerializationSource(new SerializationInfo(revision, javaEncoding, otherDeserializationFlags)); 
    225188        } 
    226189 
    227190        // set params 
    228         minUpdateInterval = bulk ? TCPSettings.getInstance().minUpdateIntervalBulk : TCPSettings.getInstance().minUpdateIntervalExpress; 
    229         maxNotAcknowledgedPackets = bulk ? TCPSettings.getInstance().maxNotAcknowledgedPacketsBulk : TCPSettings.getInstance().maxNotAcknowledgedPacketsExpress; 
     191        minUpdateInterval = primary ? TCPSettings.getInstance().minUpdateIntervalBulk : TCPSettings.getInstance().minUpdateIntervalExpress; 
     192        maxNotAcknowledgedPackets = primary ? TCPSettings.getInstance().maxNotAcknowledgedPacketsBulk : TCPSettings.getInstance().maxNotAcknowledgedPacketsExpress; 
    230193 
    231194        // Init connection 
    232195        readBufferStream.setTimeout(-1); 
    233         String typeString = bulk ? "Bulk" : "Express"; 
     196        String typeString = primary ? "Primary" : "Express"; 
    234197 
    235198        // Initialize them here, so that subscribe calls from structure creation do not get lost 
     
    238201 
    239202        // Get framework elements from connection partner 
    240         if ((connectionFlags & TCP.MANAGEMENT_DATA) != 0) { 
    241             peer.connectionElement.getModelHandler().changeNodeName(modelNode, "Connecting to " + remotePart.peerInfo.toString() + "..."); 
     203        if ((connectionFlags & TCP.PRIMARY_CONNECTION) != 0) { 
     204            BufferedModelChanges change = new BufferedModelChanges(); 
     205            change.changeNodeName(modelNode, "Connecting to " + ((RemotePart)remoteRuntime).peerInfo.toString() + "..."); 
     206            getModelHandler().applyModelChanges(change); 
     207 
     208 
    242209            //ModelNode statusNode = new ModelNode("Obtaining structure information..."); 
    243210            //peer.connectionElement.getModelHandler().addNode(modelNode, statusNode); 
    244211 
    245212            // retrieveRemotePorts(cis, cos, updateTimes, newServer); 
    246             boolean newServer = true; /*(serverCreationTime < 0) || (serverCreationTime != timeBase);*/ 
     213            //boolean newServer = true; /*(serverCreationTime < 0) || (serverCreationTime != timeBase);*/ 
    247214            //log(LogLevel.LL_DEBUG, this, (newServer ? "Connecting" : "Reconnecting") + " to server " + uuid.toString() + "..."); 
    248215 
    249             // Delete any elements from previous connections 
    250             remotePart.deleteAllChildren(); 
    251             remotePart.createNewModel(); 
     216            // Delete any elements from previous connections (no longer necessary) 
     217            //remoteRuntime.deleteAllChildren(); 
     218            remoteRuntime.createNewModel(); 
    252219 
    253220            /*portIterator.reset(); 
     
    264231                // Process structure packets 
    265232                int structurePacketSize = readBufferStream.readInt(); 
    266                 boolean readType = peer.structureExchange == FrameworkElementInfo.StructureExchange.SHARED_PORTS; 
     233                boolean readType = peer.structureExchange == Definitions.StructureExchange.SHARED_PORTS; 
    267234                MemoryBuffer structurePacketBuffer = new MemoryBuffer(structurePacketSize); 
    268                 BinaryInputStream structurePacketReadStream = new BinaryInputStream(structurePacketBuffer, remoteTypes); 
     235                BinaryInputStream structurePacketReadStream = new BinaryInputStream(structurePacketBuffer, readBufferStream); 
    269236                while (structurePacketSize != 0) { 
    270237                    structurePacketBuffer.setSize(structurePacketSize); 
     
    272239                    structurePacketReadStream.reset(); 
    273240                    if (readType) { 
    274                         DataTypeBase type = structurePacketReadStream.readType(); 
     241                        DataTypeBase type = RemoteType.deserialize(structurePacketReadStream).getDefaultLocalDataType(); 
    275242                        if (type == null || type != CoreString.TYPE) { 
    276243                            Log.log(LogLevel.WARNING, this, "Type encoding does not seem to work"); 
     
    280247                    } 
    281248                    while (structurePacketReadStream.moreDataAvailable()) { 
    282                         tempFrameworkElementInfo.deserialize(structurePacketReadStream, peer.structureExchange); 
    283                         remotePart.addRemoteStructure(tempFrameworkElementInfo, true); 
     249                        tempFrameworkElementInfo.deserialize(structurePacketReadStream, true); 
     250                        remoteRuntime.addRemoteStructure(tempFrameworkElementInfo, true, null); 
    284251                    } 
    285252                    structurePacketSize = readBufferStream.readInt(); 
    286253                } 
    287                 // remotePart.initAndCheckForAdminPort(modelNode); do this later: after bulk connection has been initialized 
     254                // remoteRuntime.initAndCheckForAdminPort(modelNode); do this later: after bulk connection has been initialized 
    288255            } catch (Exception e) { 
    289256                if (e.getCause() instanceof EOFException) { 
     
    293260                } 
    294261                //peer.connectionElement.getModelHandler().removeNode(statusNode); 
    295                 remotePart.removeConnection(this); 
     262                remoteRuntime.removeConnection(this); 
    296263                throw e; 
    297264            } 
     
    309276    } 
    310277 
    311     /** 
    312      * @return Is TCP connection disconnecting? 
    313      */ 
    314     public boolean disconnecting() { 
    315         return disconnectSignal; 
    316     } 
    317  
    318     /** 
    319      * @return Type of connection ("Bulk" oder "Express") 
    320      */ 
    321     public String getConnectionTypeString() { 
    322         return bulk ? "Bulk" : "Express"; 
    323     } 
    324  
    325     /** 
    326      * Close connection 
    327      */ 
     278    @Override 
    328279    public synchronized void disconnect() { 
    329280        disconnectSignal = true; 
    330         RuntimeSettings.getInstance().removeUpdateTimeChangeListener(this); 
    331281        synchronized (peer.connectTo) { 
    332             remotePart.removeConnection(this); 
     282            remoteRuntime.removeConnection(this); 
    333283        } 
    334284        notifyWriter(); // stops writer 
     
    360310 
    361311    /** 
    362      * Check that command is terminated correctly when TCPSettings.DEBUG_TCP is activated 
    363      */ 
    364     static void checkCommandEnd(BinaryInputStream stream) { 
    365         if (TCPSettings.DEBUG_TCP) { 
    366             int i = stream.readByte(); 
    367             if (i != TCP.DEBUG_TCP_NUMBER) { 
    368                 throw new RuntimeException("TCP Stream seems corrupt"); 
    369             } 
    370         } 
    371     } 
    372  
    373     /** 
    374312     * Listens at socket for incoming data 
    375313     */ 
     
    386324            initThreadLocalCache(); 
    387325            BinaryInputStream stream = TCPConnection.this.readBufferStream; 
    388             MemoryBuffer structureBuffer = new MemoryBuffer(); // structure changes are copied to this buffer and processed by model (thread) 
    389             BinaryOutputStream structureBufferWriter = new BinaryOutputStream(structureBuffer); 
    390             byte[] tempBuffer = new byte[2048]; 
     326            BufferedModelChanges modelChanges = new BufferedModelChanges(); 
    391327 
    392328            try { 
     
    410346                        while (lastAcknowledgedPacket != acknowledgement) { 
    411347                            lastAcknowledgedPacket = (lastAcknowledgedPacket + 1) & 0x7FFF; 
    412                             pingTimes[lastAcknowledgedPacket & TCPSettings.AVG_PING_PACKETS] = 
    413                                 (int)(curTime - sentPacketTime[lastAcknowledgedPacket & TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS]); 
     348                            pingTimes[lastAcknowledgedPacket & Definitions.AVG_PING_PACKETS] = 
     349                                (int)(curTime - sentPacketTime[lastAcknowledgedPacket & Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS]); 
    414350                        } 
    415351                        updatePingStatistics(); 
     
    417353 
    418354                    while (stream.getAbsoluteReadPosition() < nextPacketStart) { 
    419                         TCP.OpCode opCode = stream.readEnum(TCP.OpCode.class); 
    420                         if (opCode.ordinal() >= TCP.OpCode.OTHER.ordinal()) { 
     355                        Definitions.OpCode opCode = stream.readEnum(Definitions.OpCode.class); 
     356                        if (opCode.ordinal() >= Definitions.OpCode.OTHER.ordinal()) { 
    421357                            Log.log(LogLevel.WARNING, this, "Received corrupted TCP message batch. Invalid opcode. Skipping."); 
    422358                            stream.skip((int)(nextPacketStart - stream.getAbsoluteReadPosition())); 
    423359                            break; 
    424360                        } 
    425                         TCP.MessageSize messageSizeEncoding = TCP.MESSAGE_SIZES[opCode.ordinal()]; 
    426                         long messageSize = messageSizeEncoding == TCP.MessageSize.FIXED ? -1 : 
    427                                            (messageSizeEncoding == TCP.MessageSize.VARIABLE_UP_TO_255_BYTE ? (stream.readByte() & 0xFF) : stream.readInt()); 
     361                        Definitions.MessageSize messageSizeEncoding = Definitions.MESSAGE_SIZES[opCode.ordinal()]; 
     362                        long messageSize = messageSizeEncoding == Definitions.MessageSize.FIXED ? -1 : 
     363                                           (messageSizeEncoding == Definitions.MessageSize.VARIABLE_UP_TO_255_BYTE ? (stream.readByte() & 0xFF) : stream.readInt()); 
    428364                        //long messageEncodingSize = messageSizeEncoding.ordinal() * messageSizeEncoding.ordinal(); // :-) 
    429365                        long commandStartPosition = stream.getAbsoluteReadPosition(); 
     
    436372 
    437373                        // Copy to structure buffer? 
    438                         if (opCode == OpCode.STRUCTURE_CREATE || opCode == OpCode.STRUCTURE_CHANGE || opCode == OpCode.STRUCTURE_DELETE) { 
     374                        /*if (opCode == Definitions.OpCode.STRUCTURE_CREATED || opCode == Definitions.OpCode.STRUCTURE_CHANGED || opCode == Definitions.OpCode.STRUCTURE_DELETED) { 
     375                            remoteRuntime.processStructure 
     376                            // TODO because of register entries this needs to be read now 
    439377                            structureBufferWriter.writeEnum(opCode); 
    440                             long remaining = (opCode == OpCode.STRUCTURE_DELETE) ? (TCPSettings.DEBUG_TCP ? 5 : 4) : messageSize; 
     378                            long remaining = (opCode == Definitions.OpCode.STRUCTURE_DELETED) ? (TCPSettings.DEBUG_TCP ? 5 : 4) : messageSize; 
    441379                            while (remaining > 0) { 
    442380                                long copy = Math.min(tempBuffer.length, remaining); 
     
    445383                                remaining -= copy; 
    446384                            } 
    447                         } else { 
    448                             try { 
    449                                 remotePart.processMessage(opCode, stream, remoteTypes, TCPConnection.this); 
    450                                 checkCommandEnd(stream); 
    451                             } catch (Exception e) { 
    452                                 Log.log(LogLevel.WARNING, this, "Failed to deserialize message of type " + opCode.toString() + ". Skipping. Reason: ", e); 
    453                                 long skip = nextCommandStartPosition - stream.getAbsoluteReadPosition(); 
     385                        } else {*/ 
     386                        try { 
     387                            remoteRuntime.processMessage(opCode, stream, TCPConnection.this, modelChanges); 
     388                            checkCommandEnd(stream); 
     389                        } catch (Exception e) { 
     390                            Log.log(LogLevel.WARNING, this, "Failed to deserialize message of type " + opCode.toString() + ". Skipping. Reason: ", e); 
     391                            long skip = nextCommandStartPosition - stream.getAbsoluteReadPosition(); 
     392                            if (skip >= 0) { 
     393                                stream.skip((int)skip); 
     394                            } else { 
     395                                skip = nextPacketStart - stream.getAbsoluteReadPosition(); 
    454396                                if (skip >= 0) { 
    455                                     stream.skip((int)skip); 
     397                                    Log.log(LogLevel.WARNING, this, "Too much stream content was read. This is not handled yet. Moving to next message batch."); // TODO 
    456398                                } else { 
    457                                     skip = nextPacketStart - stream.getAbsoluteReadPosition(); 
    458                                     if (skip >= 0) { 
    459                                         Log.log(LogLevel.WARNING, this, "Too much stream content was read. This is not handled yet. Moving to next message batch."); // TODO 
    460                                     } else { 
    461                                         Log.log(LogLevel.WARNING, this, "Too much stream content was read - exceeding message batch. This is not handled yet. Disconnecting."); // TODO 
    462                                     } 
     399                                    Log.log(LogLevel.WARNING, this, "Too much stream content was read - exceeding message batch. This is not handled yet. Disconnecting."); // TODO 
    463400                                } 
    464401                            } 
     402                            //} 
    465403                        } 
    466404                    } 
    467405 
     406                    if (!modelChanges.empty()) { 
     407                        getModelHandler().applyModelChanges(modelChanges); 
     408                        modelChanges = new BufferedModelChanges(); 
     409                    } 
     410 
     411                    /* 
    468412                    structureBufferWriter.flush(); 
    469413                    if (structureBuffer.getSize() > 0) { 
     
    475419                            @Override 
    476420                            public void run() { 
    477                                 remotePart.processStructurePacket(structureBufferToProcess, remoteTypes); 
     421                                remoteRuntime.processStructurePacket(structureBufferToProcess, remoteTypes); 
    478422                            } 
    479423                        }); 
    480                     } 
     424                    }*/ 
    481425                } 
    482426            } catch (Exception e) { 
     427                disconnected = true; 
    483428                if (e instanceof RuntimeException && e.getCause() != null) { 
    484429                    e = (Exception)e.getCause(); 
     
    489434            } 
    490435            try { 
    491                 remotePart.disconnect(); 
     436                remoteRuntime.disconnect(); 
    492437            } catch (Exception e) { 
    493438                Log.log(LogLevel.WARNING, this, e); 
     
    497442                stream.close(); 
    498443            } catch (Exception e) {} 
     444            disconnected = true; 
    499445        } 
    500446 
     
    525471//    } 
    526472 
    527     /** 
    528      * Notify (possibly wake-up) writer thread. Should be called whenever new tasks for the writer arrive. 
    529      */ 
     473    @Override 
    530474    public void notifyWriter() { 
    531475        Writer lockedWriter = writer; 
    532476        if (lockedWriter != null) { 
    533477            lockedWriter.notifyWriter(); 
    534         } 
    535     } 
    536  
    537     /** Class to store a pair - call and timeout - in a list */ 
    538     static class CallAndTimeout { 
    539  
    540         long timeoutTime; 
    541         AbstractCall call; 
    542  
    543         public CallAndTimeout(long timeoutTime, AbstractCall call) { 
    544             this.timeoutTime = timeoutTime; 
    545             this.call = call; 
    546478        } 
    547479    } 
     
    574506 
    575507        /** Queue with TCP commands waiting to be sent */ 
    576         private final WonderQueue<SerializedTCPCommand> tcpCallsToSend = new WonderQueue<SerializedTCPCommand>(); 
     508        private final WonderQueue<SerializedMessage> tcpCallsToSend = new WonderQueue<SerializedMessage>(); 
    577509 
    578510        /** List with calls that were not ready for sending yet */ 
     
    610542        public boolean canSend() { 
    611543            int maxNotAck = maxNotAcknowledgedPackets.getValue(); 
    612             return curPacketIndex < lastAcknowledgedPacket + maxNotAck || (maxNotAck <= 0 && curPacketIndex < lastAcknowledgedPacket + TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS); 
     544            return curPacketIndex < lastAcknowledgedPacket + maxNotAck || (maxNotAck <= 0 && curPacketIndex < lastAcknowledgedPacket + Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS); 
    613545        } 
    614546 
     
    626558                    if (disconnectSignal) { 
    627559                        try { 
    628                             remotePart.disconnect(); 
     560                            remoteRuntime.disconnect(); 
    629561                        } catch (Exception e) { 
    630562                            Log.log(LogLevel.WARNING, this, e); 
     
    679611 
    680612                    // send port data 
    681                     ArrayWrapper<TCPPort> it = monitoredPorts.getIterable(); 
     613                    ArrayWrapper<RemoteProxyPort> it = monitoredPorts.getIterable(); 
    682614                    byte changedFlag = 0; 
    683615                    for (int i = 0, n = it.size(); i < n; i++) { 
    684                         TCPPort pp = it.get(i); 
     616                        RemoteProxyPort pp = it.get(i); 
    685617                        if (pp != null && pp.getPort().isReady()) { 
    686618                            if (pp.getLastUpdate() + pp.getUpdateIntervalForNet() > startTime) { 
     
    737669                        writeBuffer.getBuffer().putShort(4, curPacketIndex & 0x7FFF); // TODO 
    738670                        lastAcknowledgementRequestTime = System.currentTimeMillis(); 
    739                         sentPacketTime[curPacketIndex & TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS] = lastAcknowledgementRequestTime; 
     671                        sentPacketTime[curPacketIndex & Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS] = lastAcknowledgementRequestTime; 
    740672                    } 
    741673 
     
    752684                } 
    753685            } catch (Exception e) { 
    754  
     686                disconnected = true; 
    755687                if (e instanceof RuntimeException && e.getCause() != null) { 
    756688                    e = (Exception)e.getCause(); 
     
    761693 
    762694                try { 
    763                     remotePart.disconnect(); 
     695                    remoteRuntime.disconnect(); 
    764696                } catch (Exception e2) { 
    765697                    Log.log(LogLevel.WARNING, this, e2); 
     
    770702                stream.close(); 
    771703            } catch (Exception e) {} 
     704            disconnected = true; 
    772705        } 
    773706 
     
    846779            } 
    847780 
    848             SerializedTCPCommand tcpCall = null; 
     781            SerializedMessage tcpCall = null; 
    849782            while ((tcpCall = tcpCallsToSend.dequeue()) != null) { 
    850783                tcpCall.serialize(stream); 
     
    874807            boolean expectsResponse = call.expectsResponse(); 
    875808            if (expectsResponse) { 
    876                 call.setCallId(remotePart.nextCallId.incrementAndGet()); 
    877             } 
    878  
    879             writeBufferStream.writeEnum(TCP.OpCode.RPC_CALL); 
     809                call.setCallId(remoteRuntime.getUniqueCallId()); 
     810            } 
     811 
     812            writeBufferStream.writeEnum(Definitions.OpCode.RPC_CALL); 
    880813            writeBufferStream.writeSkipOffsetPlaceholder(); 
    881814            writeBufferStream.writeInt(call.getRemotePortHandle()); 
     
    883816            call.serialize(writeBufferStream); 
    884817            if (TCPSettings.DEBUG_TCP) { 
    885                 writeBufferStream.writeByte(TCP.DEBUG_TCP_NUMBER); 
     818                writeBufferStream.writeByte(Definitions.DEBUG_TCP_NUMBER); 
    886819            } 
    887820            writeBufferStream.skipTargetHere(); 
     
    943876         * @param call Call object 
    944877         */ 
    945         public void sendCall(SerializedTCPCommand call) { 
     878        public void sendCall(SerializedMessage call) { 
    946879            //call.responsibleThread = -1; 
    947880            tcpCallsToSend.enqueue(call); 
     
    949882        } 
    950883    } 
    951  
    952     /** 
    953      * Checks whether any waiting calls have timed out. 
    954      * Removes any timed out calls from list. 
    955      * 
    956      * @return Are still calls waiting? 
    957      */ 
    958     public boolean checkWaitingCallsForTimeout(long timeNow) { 
    959         synchronized (callsAwaitingResponse) { 
    960             for (int i = 0; i < callsAwaitingResponse.size(); i++) { 
    961                 if (timeNow > callsAwaitingResponse.get(i).timeoutTime) { 
    962                     callsAwaitingResponse.get(i).call.setException(FutureStatus.TIMEOUT); 
    963                     callsAwaitingResponse.remove(i); 
    964                     i--; 
    965                 } 
    966             } 
    967             return callsAwaitingResponse.size() > 0; 
    968         } 
    969     } 
    970  
    971     /** 
    972      * Should be called regularly by monitoring thread to check whether critical 
    973      * ping time threshold is exceeded. 
    974      * 
    975      * @return Time the calling thread may wait before calling again (it futile to call this method before) 
    976      */ 
    977     public long checkPingForDisconnect() { 
    978         Writer lockedWriter = writer; 
    979         if (lockedWriter == null) { 
    980             return TCPSettings.getInstance().criticalPingThreshold.getValue(); 
    981         } 
    982         if (lastAcknowledgedPacket != lockedWriter.curPacketIndex) { 
    983             long criticalPacketTime = sentPacketTime[(lastAcknowledgedPacket + 1) & TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS]; 
    984             long timeLeft = criticalPacketTime + TCPSettings.getInstance().criticalPingThreshold.getValue() - System.currentTimeMillis(); 
    985             if (timeLeft < 0) { 
    986                 handlePingTimeExceed(); 
    987                 return TCPSettings.getInstance().criticalPingThreshold.getValue(); 
    988             } 
    989             return timeLeft; 
    990         } else { 
    991             return TCPSettings.getInstance().criticalPingThreshold.getValue(); 
    992         } 
    993     } 
    994  
    995     /** 
    996      * Updates ping statistic variables 
    997      */ 
    998     private void updatePingStatistics() { 
    999         int result = 0; 
    1000         int resultAvg = 0; 
    1001         for (int i = 0; i < pingTimes.length; i++) { 
    1002             result = Math.max(result, pingTimes[i]); 
    1003             resultAvg += pingTimes[i]; 
    1004         } 
    1005         maxPingTime = result; 
    1006         avgPingTime = resultAvg / pingTimes.length; 
    1007     } 
    1008  
    1009     /** 
    1010      * @return Maximum ping time among last TCPSettings.AVG_PING_PACKETS packets 
    1011      */ 
    1012     public int getMaxPingTime() { 
    1013         return maxPingTime; 
    1014     } 
    1015  
    1016     /** 
    1017      * @return Average ping time among last TCPSettings.AVG_PING_PACKETS packets 
    1018      */ 
    1019     public int getAvgPingTime() { 
    1020         return avgPingTime; 
    1021     } 
    1022  
    1023     /** 
    1024      * @return Is critical ping time currently exceeded (possibly temporary disconnect) 
    1025      */ 
    1026     public boolean pingTimeExceeed() { 
    1027         Writer lockedWriter = writer; 
    1028         if (lockedWriter == null) { 
    1029             return false; 
    1030         } 
    1031         if (lastAcknowledgedPacket != lockedWriter.curPacketIndex) { 
    1032             long criticalPacketTime = sentPacketTime[(lastAcknowledgedPacket + 1) & TCPSettings.MAX_NOT_ACKNOWLEDGED_PACKETS]; 
    1033             long timeLeft = criticalPacketTime + TCPSettings.getInstance().criticalPingThreshold.getValue() - System.currentTimeMillis(); 
    1034             return timeLeft < 0; 
    1035         } else { 
    1036             return false; 
    1037         } 
    1038     } 
    1039  
    1040     /** 
    1041      * Called when critical ping time threshold was exceeded 
    1042      */ 
    1043     public void handlePingTimeExceed() {} // TODO 
    1044884 
    1045885//    /** 
     
    1092932//    } 
    1093933 
    1094     /** 
    1095      * Send call to connection partner 
    1096      * 
    1097      * @param call Call object 
    1098      */ 
     934    @Override 
    1099935    public void sendCall(AbstractCall call) { 
    1100936        Writer lockedWriter = writer; 
     
    1106942    } 
    1107943 
    1108     /** 
    1109      * Send serialized TCP call to connection partner 
    1110      * 
    1111      * @param call Call object 
    1112      */ 
    1113     public void sendCall(SerializedTCPCommand call) { 
     944    @Override 
     945    public void sendCall(SerializedMessage call) { 
    1114946        Writer lockedWriter = writer; 
    1115947        if (lockedWriter != null && (!disconnectSignal)) { 
     
    1118950    } 
    1119951 
     952    /** 
     953     * @return Is critical ping time currently exceeded (possibly temporary disconnect) 
     954     */ 
    1120955    @Override 
    1121     public void updateTimeChanged(DataTypeBase dt, short newUpdateTime) { 
    1122         // forward update time change to connection partner 
    1123         SerializedTCPCommand command = new SerializedTCPCommand(TCP.OpCode.TYPE_UPDATE, 8); 
    1124         command.getWriteStream().writeShort(dt == null ? -1 : dt.getUid()); 
    1125         command.getWriteStream().writeShort(newUpdateTime); 
    1126         sendCall(command); 
     956    public boolean pingTimeExceeed() { 
     957        Writer lockedWriter = writer; 
     958        if (lockedWriter == null) { 
     959            return false; 
     960        } 
     961        if (lastAcknowledgedPacket != lockedWriter.curPacketIndex) { 
     962            long criticalPacketTime = sentPacketTime[(lastAcknowledgedPacket + 1) & Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS]; 
     963            long timeLeft = criticalPacketTime + TCPSettings.getInstance().criticalPingThreshold.getValue() - System.currentTimeMillis(); 
     964            return timeLeft < 0; 
     965        } else { 
     966            return false; 
     967        } 
     968    } 
     969 
     970    /** 
     971     * Should be called regularly by monitoring thread to check whether critical 
     972     * ping time threshold is exceeded. 
     973     * 
     974     * @return Time the calling thread may wait before calling again (it futile to call this method before) 
     975     */ 
     976    public long checkPingForDisconnect() { 
     977        Writer lockedWriter = writer; 
     978        if (lockedWriter == null) { 
     979            return TCPSettings.getInstance().criticalPingThreshold.getValue(); 
     980        } 
     981        if (lastAcknowledgedPacket != lockedWriter.curPacketIndex) { 
     982            long criticalPacketTime = sentPacketTime[(lastAcknowledgedPacket + 1) & Definitions.MAX_NOT_ACKNOWLEDGED_PACKETS]; 
     983            long timeLeft = criticalPacketTime + TCPSettings.getInstance().criticalPingThreshold.getValue() - System.currentTimeMillis(); 
     984            if (timeLeft < 0) { 
     985                handlePingTimeExceed(); 
     986                return TCPSettings.getInstance().criticalPingThreshold.getValue(); 
     987            } 
     988            return timeLeft; 
     989        } else { 
     990            return TCPSettings.getInstance().criticalPingThreshold.getValue(); 
     991        } 
     992    } 
     993 
     994    /** 
     995     * Writes all contents of write buffer to network and empties and resets it 
     996     */ 
     997    protected void writeBufferToNetwork() throws IOException { 
     998        writeBufferStream.flush(); 
     999        socketOutputStream.write(writeBuffer.getBuffer().getBuffer().array(), 0, writeBuffer.getSize()); 
     1000        writeBufferStream.reset(); 
     1001    } 
     1002 
     1003    @Override 
     1004    public ModelHandler getModelHandler() { 
     1005        return peer.connectionElement.getModelHandler(); 
     1006    } 
     1007 
     1008    /** 
     1009     * @return Whether connection has been disconnected - or is disconnected 
     1010     */ 
     1011    public boolean isDisconnected() { 
     1012        return disconnected; 
    11271013    } 
    11281014 
     
    12401126//    } 
    12411127 
    1242     /** 
    1243      * @return Data rate of bytes read from network (in bytes/s) 
    1244      */ 
    1245     public int getRx() { 
    1246         long lastTime = lastRxTimestamp; 
    1247         long lastPos = lastRxPosition; 
    1248         lastRxTimestamp = Time.getCoarse(); 
    1249         lastRxPosition = readBufferStream.getAbsoluteReadPosition(); 
    1250         if (lastTime == 0) { 
    1251             return 0; 
    1252         } 
    1253         if (lastRxTimestamp == lastTime) { 
    1254             return 0; 
    1255         } 
    1256  
    1257         double data = lastRxPosition - lastPos; 
    1258         double interval = (lastRxTimestamp - lastTime) / 1000; 
    1259         return (int)(data / interval); 
    1260     } 
    1261  
    1262     public String toString() { 
    1263         return description; 
    1264     } 
    1265  
    1266     /** 
    1267      * Writes all contents of write buffer to network and empties and resets it 
    1268      */ 
    1269     protected void writeBufferToNetwork() throws IOException { 
    1270         writeBufferStream.flush(); 
    1271         socketOutputStream.write(writeBuffer.getBuffer().getBuffer().array(), 0, writeBuffer.getSize()); 
    1272         writeBufferStream.reset(); 
    1273     } 
    1274  
    1275     /** 
    1276      * Subscribe to port changes on remote server 
    1277      * 
    1278      * @param index Port index in remote runtime 
    1279      * @param strategy Strategy to use/request 
    1280      * @param updateInterval Minimum interval in ms between notifications (values <= 0 mean: use server defaults) 
    1281      * @param localIndex Local Port Index 
    1282      * @param dataType DataType got from server 
    1283      * @param enc Data type to use when writing data to the network 
    1284      */ 
    1285     public void subscribe(int index, short strategy, boolean reversePush, short updateInterval, int localIndex, Serialization.DataEncoding enc) { 
    1286         SerializedTCPCommand command = new SerializedTCPCommand(TCP.OpCode.SUBSCRIBE, 16); 
    1287         command.getWriteStream().writeInt(index); 
    1288         command.getWriteStream().writeShort(strategy); 
    1289         command.getWriteStream().writeBoolean(reversePush); 
    1290         command.getWriteStream().writeShort(updateInterval); 
    1291         command.getWriteStream().writeInt(localIndex); 
    1292         command.getWriteStream().writeEnum(enc); 
    1293         sendCall(command); 
    1294     } 
    1295  
    1296     /** 
    1297      * Unsubscribe from port changes on remote server 
    1298      * 
    1299      * @param index Port index in remote runtime 
    1300      */ 
    1301     public void unsubscribe(int index) { 
    1302         SerializedTCPCommand command = new SerializedTCPCommand(TCP.OpCode.UNSUBSCRIBE, 8); 
    1303         command.getWriteStream().writeInt(index); 
    1304         sendCall(command); 
    1305     } 
    1306  
    1307     /** 
    1308      * @param portIndex Index of port 
    1309      * @return TCPPort for specified index 
    1310      */ 
    1311     protected TCPPort lookupPortForCallHandling(int portIndex) { 
    1312         AbstractPort ap = RuntimeEnvironment.getInstance().getPort(portIndex); 
    1313         TCPPort p = null; 
    1314         if (ap != null) { 
    1315             p = (TCPPort)ap.asNetPort(); 
    1316             assert(p != null); 
    1317         } 
    1318         return p; 
    1319     } 
    1320  
    1321     @Override 
    1322     public void sendResponse(AbstractCall responseToSend) { 
    1323         sendCall(responseToSend); 
    1324     } 
    13251128} 
  • internal/TCPPeer.java

    r111 r114  
    3636import org.finroc.core.FrameworkElement.ChildIterator; 
    3737import org.finroc.core.RuntimeSettings; 
    38 import org.finroc.core.datatype.FrameworkElementInfo; 
    3938import org.finroc.core.plugin.ExternalConnection; 
     39import org.finroc.core.remote.BufferedModelChanges; 
     40import org.finroc.core.remote.Definitions; 
    4041import org.finroc.core.remote.ModelNode; 
    4142import org.finroc.plugins.tcp.internal.TCP.PeerType; 
     
    7071 
    7172    /** Amount of structure this client is interested in */ 
    72     final FrameworkElementInfo.StructureExchange structureExchange; 
     73    final Definitions.StructureExchange structureExchange; 
    7374 
    7475    /** 
     
    115116     * @param serverListenAddress The address that server is supposed to listen on ("" will enable IPv6) 
    116117     */ 
    117     public TCPPeer(ExternalConnection frameworkElement, String peerName, TCP.PeerType peerType, FrameworkElementInfo.StructureExchange structureExchange, String networkConnection, 
     118    public TCPPeer(ExternalConnection frameworkElement, String peerName, TCP.PeerType peerType, Definitions.StructureExchange structureExchange, String networkConnection, 
    118119                   int preferredServerPort, boolean tryNextPortsIfOccupied, boolean autoConnectToAllPeers, String serverListenAddress) { 
    119120        this.connectionElement = frameworkElement; 
     
    244245            activelyConnect = true; 
    245246            modelRootNode = new ModelNode("TCP"); 
    246             connectionElement.getModelHandler().setModelRoot(modelRootNode); 
     247            BufferedModelChanges changes = new BufferedModelChanges(); 
     248            changes.setModelRoot(modelRootNode); 
     249            connectionElement.getModelHandler().applyModelChanges(changes); 
    247250 
    248251            if (address.length() > 0) { 
     
    368371            if (fe instanceof RemotePart && fe.isReady()) { 
    369372                RemotePart rs = (RemotePart)fe; 
    370                 if (!rs.peerInfo.connected) { 
     373                if (!rs.isConnected()) { 
    371374                    return 0; 
    372375                } 
     
    388391                if (fe instanceof RemotePart && fe.isReady()) { 
    389392                    RemotePart rs = (RemotePart)fe; 
    390                     if (!rs.peerInfo.connected) { 
     393                    if (!rs.isConnected()) { 
    391394                        continue; 
    392395                    } 
     
    411414     */ 
    412415    public boolean isAdminConnection() { 
    413         return structureExchange == FrameworkElementInfo.StructureExchange.FINSTRUCT; 
     416        return structureExchange == Definitions.StructureExchange.FINSTRUCT; 
    414417    } 
    415418 
     
    445448     * @param address IP address of remote part 
    446449     * @param neverForget Is this a remote peer to never forget? 
     450     * @param partnerHandleStampWidth Stamp Width of partner's framework element handles 
    447451     * @return Pointer to remote part 
    448452     */ 
    449     public RemotePart getRemotePart(UUID uuid, TCP.PeerType peerType, String peerName, InetAddress address, boolean neverForget) throws Exception { 
     453    public RemotePart getRemotePart(UUID uuid, TCP.PeerType peerType, String peerName, InetAddress address, boolean neverForget, int partnerHandleStampWidth) throws Exception { 
    450454        synchronized (connectTo) { 
    451455            if (uuid.equals(this.thisPeer.uuid)) { 
     
    457461                if (uuid.equals(info.uuid)) { 
    458462                    if (info.remotePart == null) { 
    459                         info.remotePart = new RemotePart(info, connectionElement, this); 
     463                        info.remotePart = new RemotePart(info, connectionElement, this, partnerHandleStampWidth); 
    460464                        info.remotePart.init(); 
    461465                    } 
     
    475479            info.name = peerName; 
    476480            info.neverForget = neverForget; 
    477             info.remotePart = new RemotePart(info, connectionElement, this); 
     481            info.remotePart = new RemotePart(info, connectionElement, this, partnerHandleStampWidth); 
    478482            info.remotePart.init(); 
    479483            return info.remotePart; 
     
    511515        } 
    512516        if (modelRootNode != null) { 
    513             connectionElement.getModelHandler().removeNode(modelRootNode); 
     517            BufferedModelChanges changes = new BufferedModelChanges(); 
     518            changes.removeNode(modelRootNode); 
     519            connectionElement.getModelHandler().applyModelChanges(changes); 
    514520        } 
    515521        modelRootNode = null; 
     
    522528     */ 
    523529    private void serializePeerInfo(BinaryOutputStream stream, PeerInfo peer) { 
    524         if ((peer == thisPeer || peer.connected) && peer.peerType != TCP.PeerType.CLIENT_ONLY) { 
     530        if ((peer == thisPeer || peer.isConnected()) && peer.peerType != TCP.PeerType.CLIENT_ONLY) { 
    525531            stream.writeBoolean(true); 
    526532 
     
    666672 
    667673                    for (PeerInfo info : otherPeers) { 
    668                         if ((!info.connected) && (!info.connecting) && (info.peerType != PeerType.CLIENT_ONLY)) { 
     674                        if ((!info.isConnected()) && (!info.connecting) && (info.peerType != PeerType.CLIENT_ONLY)) { 
    669675                            new PeerConnectorThread(info).start(); 
    670676                        } 
     
    689695            this.connectTo = connectTo; 
    690696            modelNode = new ModelNode("Looking for " + TCP.formatInetSocketAddress(connectTo) + "..."); 
    691             connectionElement.getModelHandler().addNode(modelRootNode, modelNode); 
     697            BufferedModelChanges changes = new BufferedModelChanges(); 
     698            changes.addNode(modelRootNode, modelNode); 
     699            connectionElement.getModelHandler().applyModelChanges(changes); 
    692700        } 
    693701 
     
    698706                stopThread(); 
    699707                if (modelRootNode != null) { 
    700                     connectionElement.getModelHandler().removeNode(modelNode); 
     708                    BufferedModelChanges changes = new BufferedModelChanges(); 
     709                    changes.removeNode(modelNode); 
     710                    connectionElement.getModelHandler().applyModelChanges(changes); 
    701711                } 
    702712                if (peerInfo != null) { 
     
    710720            try { 
    711721                socket.connect(connectTo); 
    712                 connection1 = new TCPConnection(TCPPeer.this, socket, TCP.BULK_DATA, false, modelNode); 
     722                connection1 = new TCPConnection(TCPPeer.this, socket, TCP.PRIMARY_CONNECTION | TCP.BULK_DATA, false, modelNode); 
    713723                //new TCPConnection(TCPPeer.this, socket, TCP.MANAGEMENT_DATA | TCP.EXPRESS_DATA, true, modelNode); 
    714724                socket = new Socket(); 
    715725                socket.connect(connectTo); 
    716726                //new TCPConnection(TCPPeer.this, socket, TCP.BULK_DATA, false, modelNode); 
    717                 connection2 = new TCPConnection(TCPPeer.this, socket, TCP.MANAGEMENT_DATA | TCP.EXPRESS_DATA, true, modelNode); 
     727                connection2 = new TCPConnection(TCPPeer.this, socket, TCP.EXPRESS_DATA, true, modelNode); 
    718728                peerInfo = findPeerInfoFor(connectTo); 
    719729                peerInfo.remotePart.initAndCheckForAdminPort(modelNode); 
     
    733743                    peerInfo.remotePart.deleteAllChildren(); 
    734744                } 
    735                 connectionElement.getModelHandler().changeNodeName(modelNode, "Looking for " + TCP.formatInetSocketAddress(connectTo) + "..."); 
    736                 //e.printStackTrace(); 
     745                BufferedModelChanges changes = new BufferedModelChanges(); 
     746                changes.changeNodeName(modelNode, "Looking for " + TCP.formatInetSocketAddress(connectTo) + "..."); 
     747                connectionElement.getModelHandler().applyModelChanges(changes); 
     748                e.printStackTrace(); 
    737749                socket.close(); 
    738750                // simply try again 
     
    756768            peer.connecting = true; 
    757769            modelNode = new ModelNode("Looking for " + peer.uuid.toString() + "..."); 
    758             connectionElement.getModelHandler().addNode(modelRootNode, modelNode); 
     770            BufferedModelChanges changes = new BufferedModelChanges(); 
     771            changes.addNode(modelRootNode, modelNode); 
     772            connectionElement.getModelHandler().applyModelChanges(changes); 
    759773        } 
    760774 
    761775        @Override 
    762776        public void mainLoopCallback() throws Exception { 
    763             if ((!activelyConnect) || peer.connected) { 
     777            if ((!activelyConnect) || peer.isConnected()) { 
    764778                stopThread(); 
    765779                if (modelRootNode != null) { 
    766                     connectionElement.getModelHandler().removeNode(modelNode); 
     780                    BufferedModelChanges changes = new BufferedModelChanges(); 
     781                    changes.removeNode(modelNode); 
     782                    connectionElement.getModelHandler().applyModelChanges(changes); 
    767783                } 
    768784                peer.connecting = false; 
     
    777793                TCPConnection connection1 = null, connection2 = null; 
    778794                try { 
    779                     if (peer.remotePart == null || peer.remotePart.bulkConnection == null) { 
     795                    RemotePart remotePart = peer.remotePart; 
     796                    if (remotePart != null && (!remotePart.isConnected())) { 
     797                        continue; // disconnecting - wait 
     798                    } 
     799 
     800                    if (remotePart == null || remotePart.getPrimaryConnection() == null) { 
    780801                        socket.connect(socketAddress); 
    781802                        //new TCPConnection(TCPPeer.this, socket, TCP.MANAGEMENT_DATA | TCP.EXPRESS_DATA, false, modelNode); 
    782                         connection1 = new TCPConnection(TCPPeer.this, socket, TCP.BULK_DATA, false, modelNode); 
    783                     } 
    784                     if (peer.remotePart == null || peer.remotePart.managementConnection == null) { 
     803                        connection1 = new TCPConnection(TCPPeer.this, socket, TCP.PRIMARY_CONNECTION | TCP.BULK_DATA, false, modelNode); 
     804                    } 
     805                    if (peer.remotePart == null || peer.remotePart.getExpressConnection() == null) { 
    785806                        socket = new Socket(); 
    786807                        socket.connect(socketAddress); 
    787808                        //new TCPConnection(TCPPeer.this, socket, TCP.BULK_DATA, false, modelNode); 
    788                         connection2 = new TCPConnection(TCPPeer.this, socket, TCP.MANAGEMENT_DATA | TCP.EXPRESS_DATA, false, modelNode); 
     809                        connection2 = new TCPConnection(TCPPeer.this, socket, TCP.EXPRESS_DATA, false, modelNode); 
    789810                    } 
    790811                    peer.remotePart.initAndCheckForAdminPort(modelNode); 
     
    816837                        peer.remotePart.deleteAllChildren(); 
    817838                    } 
    818                     connectionElement.getModelHandler().changeNodeName(modelNode, "Looking for " + peer.uuid.toString() + "..."); 
     839                    BufferedModelChanges changes = new BufferedModelChanges(); 
     840                    changes.changeNodeName(modelNode, "Looking for " + peer.uuid.toString() + "..."); 
     841                    connectionElement.getModelHandler().applyModelChanges(changes); 
    819842                    socket.close(); 
    820843                    // simply try again 
Note: See TracChangeset for help on using the changeset viewer.