diff --git a/common/src/main/java/org/tron/common/parameter/CommonParameter.java b/common/src/main/java/org/tron/common/parameter/CommonParameter.java index 3fe1e878ffb..13393d96081 100644 --- a/common/src/main/java/org/tron/common/parameter/CommonParameter.java +++ b/common/src/main/java/org/tron/common/parameter/CommonParameter.java @@ -141,6 +141,37 @@ public class CommonParameter { @Getter @Setter public int minParticipationRate; + // -- Tor anonymous transaction broadcast -- + // When enabled, locally-originated transactions are sent through the Tor SOCKS5 proxy + // to the HTTP broadcast API of standard (unmodified) TRON nodes instead of being advertised + // to P2P peers, hiding the originating node's IP. + @Getter + @Setter + public boolean torBroadcastEnable = false; + @Getter + @Setter + public String torSocksHost = "127.0.0.1"; + @Getter + @Setter + public int torSocksPort = 9050; + @Getter + @Setter + public int torConnectTimeout = 30000; + @Getter + @Setter + public int torReadTimeout = 30000; + @Getter + @Setter + public int torBroadcastCount = 2; + @Getter + @Setter + public boolean torCircuitIsolation = true; + @Getter + @Setter + public int torControlPort = 0; + @Getter + @Setter + public String torControlPassword = ""; @Getter public P2pConfig p2pConfig; @Getter @@ -426,7 +457,9 @@ public class CommonParameter { // from clearParam(), consistent with mainnet.conf public List passiveNodes = new ArrayList<>(); @Getter - public List fastForwardNodes; // clearParam: new ArrayList<>() + // Default to an empty list (never null) so consumers that capture it at class-load time + // (e.g. PeerConnection's static relayNodes) can't capture a null before Args is initialised. + public List fastForwardNodes = new ArrayList<>(); @Getter public int maxFastForwardNum; // clearParam: 4 @Getter diff --git a/common/src/main/java/org/tron/core/config/args/NodeConfig.java b/common/src/main/java/org/tron/core/config/args/NodeConfig.java index ea9f26a06a0..80d31c67ac5 100644 --- a/common/src/main/java/org/tron/core/config/args/NodeConfig.java +++ b/common/src/main/java/org/tron/core/config/args/NodeConfig.java @@ -117,6 +117,73 @@ public class NodeConfig { private List fastForward = new ArrayList<>(); private List disabledApi = new ArrayList<>(); + // node.tor.* — anonymous outbound transaction broadcast over the Tor SOCKS5 proxy. + // Read manually in fromConfig() so an absent [node.tor] block keeps prior behaviour + // and no reference.conf defaults are required. + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private boolean torBroadcastEnable = false; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private String torSocksHost = "127.0.0.1"; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torSocksPort = 9050; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torConnectTimeout = 30000; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torReadTimeout = 30000; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torBroadcastCount = 2; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private boolean torCircuitIsolation = true; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private int torControlPort = 0; + @Getter(lombok.AccessLevel.NONE) + @Setter(lombok.AccessLevel.NONE) + private String torControlPassword = ""; + + public boolean isTorBroadcastEnable() { + return torBroadcastEnable; + } + + public String getTorSocksHost() { + return torSocksHost; + } + + public int getTorSocksPort() { + return torSocksPort; + } + + public int getTorConnectTimeout() { + return torConnectTimeout; + } + + public int getTorReadTimeout() { + return torReadTimeout; + } + + public int getTorBroadcastCount() { + return torBroadcastCount; + } + + public boolean isTorCircuitIsolation() { + return torCircuitIsolation; + } + + public int getTorControlPort() { + return torControlPort; + } + + public String getTorControlPassword() { + return torControlPassword; + } + // ---- Sub-object fields ---- private P2pConfig p2p = new P2pConfig(); private HttpConfig http = new HttpConfig(); @@ -394,6 +461,17 @@ public static NodeConfig fromConfig(Config config) { + "Please use [node.allowShieldedTransactionApi] instead."); } + // node.tor.* — read manually so an absent block is a no-op (feature stays off) + nc.torBroadcastEnable = getBool(section, "tor.enabled", false); + nc.torSocksHost = getString(section, "tor.socksHost", "127.0.0.1"); + nc.torSocksPort = getInt(section, "tor.socksPort", 9050); + nc.torConnectTimeout = getInt(section, "tor.connectTimeout", 30000); + nc.torReadTimeout = getInt(section, "tor.readTimeout", 30000); + nc.torBroadcastCount = getInt(section, "tor.broadcastCount", 2); + nc.torCircuitIsolation = getBool(section, "tor.circuitIsolation", true); + nc.torControlPort = getInt(section, "tor.controlPort", 0); + nc.torControlPassword = getString(section, "tor.controlPassword", ""); + // node.shutdown.* — PascalCase keys (BlockTime, BlockHeight), cannot auto-bind nc.shutdownBlockTime = config.hasPath("node.shutdown.BlockTime") ? config.getString("node.shutdown.BlockTime") : ""; diff --git a/framework/src/main/java/org/tron/core/Wallet.java b/framework/src/main/java/org/tron/core/Wallet.java index 0482643d8d0..67852d73b5f 100755 --- a/framework/src/main/java/org/tron/core/Wallet.java +++ b/framework/src/main/java/org/tron/core/Wallet.java @@ -189,6 +189,7 @@ import org.tron.core.net.TronNetDelegate; import org.tron.core.net.TronNetService; import org.tron.core.net.message.adv.TransactionMessage; +import org.tron.core.net.service.tor.TorBroadcastService; import org.tron.core.store.AccountIdIndexStore; import org.tron.core.store.AccountStore; import org.tron.core.store.AccountTraceStore; @@ -277,6 +278,8 @@ public class Wallet { @Autowired private TronNetService tronNetService; @Autowired + private TorBroadcastService torBroadcastService; + @Autowired private TronNetDelegate tronNetDelegate; @Autowired private Manager dbManager; @@ -556,9 +559,24 @@ public GrpcAPI.Return broadcastTransaction(Transaction signedTransaction) { if (trx.getInstance().getRawData().getContractCount() == 0) { throw new ContractValidateException(ActuatorConstant.CONTRACT_NOT_EXIST); } - TransactionMessage message = new TransactionMessage(trx.getInstance().toByteArray()); trx.checkExpiration(chainBaseManager.getNextBlockSlotTime()); dbManager.pushTransaction(trx); + + if (CommonParameter.getInstance().isTorBroadcastEnable()) { + // Anonymous path: the transaction is validated and kept locally, but instead of being + // advertised to P2P peers (which would expose this node's IP as the origin) it is relayed + // through the Tor SOCKS5 proxy to standard TRON nodes, which broadcast it to the network. + int relayed = torBroadcastService.broadcastTransaction(signedTransaction); + if (relayed == 0) { + return builder.setResult(false).setCode(response_code.NOT_ENOUGH_EFFECTIVE_CONNECTION) + .setMessage(ByteString.copyFromUtf8("Tor broadcast failed.")).build(); + } + logger.info("Broadcast transaction {} via Tor to {} relay nodes successfully.", + txID, relayed); + return builder.setResult(true).setCode(response_code.SUCCESS).build(); + } + + TransactionMessage message = new TransactionMessage(trx.getInstance().toByteArray()); int num = tronNetService.fastBroadcastTransaction(message); if (num == 0 && minEffectiveConnection != 0) { return builder.setResult(false).setCode(response_code.NOT_ENOUGH_EFFECTIVE_CONNECTION) diff --git a/framework/src/main/java/org/tron/core/config/args/Args.java b/framework/src/main/java/org/tron/core/config/args/Args.java index 2d6660f9a6a..f1a5a9bf0fb 100644 --- a/framework/src/main/java/org/tron/core/config/args/Args.java +++ b/framework/src/main/java/org/tron/core/config/args/Args.java @@ -640,6 +640,17 @@ private static void applyNodeConfig(NodeConfig nc) { PARAMETER.unsolidifiedBlockCheck = nc.isUnsolidifiedBlockCheck(); PARAMETER.maxUnsolidifiedBlocks = nc.getMaxUnsolidifiedBlocks(); + // ---- Tor anonymous transaction broadcast ---- + PARAMETER.torBroadcastEnable = nc.isTorBroadcastEnable(); + PARAMETER.torSocksHost = nc.getTorSocksHost(); + PARAMETER.torSocksPort = nc.getTorSocksPort(); + PARAMETER.torConnectTimeout = nc.getTorConnectTimeout(); + PARAMETER.torReadTimeout = nc.getTorReadTimeout(); + PARAMETER.torBroadcastCount = nc.getTorBroadcastCount(); + PARAMETER.torCircuitIsolation = nc.isTorCircuitIsolation(); + PARAMETER.torControlPort = nc.getTorControlPort(); + PARAMETER.torControlPassword = nc.getTorControlPassword(); + // disabledApi list — lowercase normalization PARAMETER.disabledApiList = nc.getDisabledApi().isEmpty() ? Collections.emptyList() diff --git a/framework/src/main/java/org/tron/core/net/TronNetService.java b/framework/src/main/java/org/tron/core/net/TronNetService.java index 8b97c8d9f4d..14ec7b053f3 100644 --- a/framework/src/main/java/org/tron/core/net/TronNetService.java +++ b/framework/src/main/java/org/tron/core/net/TronNetService.java @@ -31,6 +31,7 @@ import org.tron.core.net.service.sync.SyncService; import org.tron.p2p.P2pConfig; import org.tron.p2p.P2pService; +import org.tron.p2p.discover.Node; import org.tron.p2p.utils.NetUtil; @Slf4j(topic = "net") @@ -138,6 +139,15 @@ public int fastBroadcastTransaction(TransactionMessage msg) { return advService.fastBroadcastTransaction(msg); } + /** + * Nodes discovered via the (UDP) discovery layer. Only a subset of these is actually connected + * as sync peers; the rest are known-but-unconnected and are used as anonymous Tor broadcast + * targets (see TorBroadcastService). + */ + public List getTableNodes() { + return p2pService.getTableNodes(); + } + public static boolean hasIpv4Stack(Set ipSet) { for (String ip : ipSet) { InetAddress inetAddress; diff --git a/framework/src/main/java/org/tron/core/net/service/tor/TorBroadcastService.java b/framework/src/main/java/org/tron/core/net/service/tor/TorBroadcastService.java new file mode 100644 index 00000000000..18d82e27972 --- /dev/null +++ b/framework/src/main/java/org/tron/core/net/service/tor/TorBroadcastService.java @@ -0,0 +1,813 @@ +package org.tron.core.net.service.tor; + +import com.google.protobuf.ByteString; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import javax.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.tron.common.math.StrictMathWrapper; +import org.tron.common.parameter.CommonParameter; +import org.tron.common.utils.Sha256Hash; +import org.tron.core.capsule.TransactionCapsule; +import org.tron.core.net.TronNetDelegate; +import org.tron.core.net.TronNetService; +import org.tron.core.net.message.MessageTypes; +import org.tron.core.net.message.adv.FetchInvDataMessage; +import org.tron.core.net.message.adv.InventoryMessage; +import org.tron.core.net.message.adv.TransactionsMessage; +import org.tron.core.net.peer.PeerConnection; +import org.tron.p2p.base.Parameter; +import org.tron.p2p.connection.business.upgrade.UpgradeController; +import org.tron.p2p.connection.message.MessageType; +import org.tron.p2p.connection.message.keepalive.PongMessage; +import org.tron.p2p.discover.Node; +import org.tron.p2p.protos.Connect; +import org.tron.p2p.utils.NetUtil; +import org.tron.protos.Protocol; +import org.tron.protos.Protocol.Inventory.InventoryType; +import org.tron.protos.Protocol.Transaction; + +/** + * Broadcasts locally-originated transactions anonymously over Tor, using the native TRON P2P + * protocol so the receiving nodes are standard, unmodified TRON full nodes. + * + *

When {@code node.tor.enabled} is true, a transaction submitted to this node's own broadcast + * API is validated locally as usual but, instead of being advertised to the node's directly + * connected sync peers (which would expose its IP as the origin), it is relayed through the local + * Tor SOCKS5 proxy to standard nodes learned via discovery that are NOT current sync peers, hiding + * the origin. Block sync and normal P2P traffic keep running in the clear. + * + *

Reaching a Tor peer and completing the P2P handshake costs several seconds, which is paid per + * connection, not per transaction. To make broadcasting fast even at a high transaction rate the + * service keeps a pool of persistent Tor tunnels to verified, at-head relay nodes: each + * tunnel is handshaked once, kept alive (answering keep-alive pings), reused for every broadcast, + * and rebuilt only when it breaks. Transactions are buffered and flushed as a single + * {@code TransactionsMessage} batch over the tunnels, so a burst of transactions is delivered in + * roughly one round-trip. {@code broadcastTransaction} enqueues and returns immediately. + */ +@Slf4j(topic = "net") +@Component +public class TorBroadcastService { + + private static final byte HELLO_TYPE = MessageType.HANDSHAKE_HELLO.getType(); + private static final byte PING_TYPE = MessageType.KEEP_ALIVE_PING.getType(); + private static final int SOCKS_VERSION = 0x05; + private static final int SOCKS_CMD_CONNECT = 0x01; + private static final int SOCKS_AUTH_NONE = 0x00; + private static final int SOCKS_AUTH_USERPASS = 0x02; + + private static final int POOL_SIZE = 6; + private static final int MAINTAIN_INTERVAL_MS = 3000; + private static final int FLUSH_INTERVAL_MS = 150; + private static final int MAX_BATCH = 300; + // How many connect attempts to launch per maintenance round: just enough to refill the missing + // tunnels plus a small margin for the ones that fail over Tor — NOT a fixed large batch, so we + // don't build (and immediately close) far more circuits than needed. + private static final int CONNECT_MARGIN = 2; + // When a round can't refill the pool (Tor congested/down), skip up to 2^level rounds before the + // next attempt, so we back off instead of flooding a struggling Tor with circuit requests. + private static final int MAX_BACKOFF_LEVEL = 4; + // A relay is only kept if its advertised head is within this many blocks of ours, so it is a + // well-connected node that actually propagates the transaction rather than sitting on it. + private static final long HEAD_TOLERANCE = 200; + + @Autowired + private TronNetService tronNetService; + @Autowired + private TronNetDelegate tronNetDelegate; + + private final ExecutorService connectExecutor = Executors.newCachedThreadPool(r -> { + Thread t = new Thread(r, "tor-relay-connect"); + t.setDaemon(true); + return t; + }); + private volatile ScheduledExecutorService scheduler; + + private final List channels = new CopyOnWriteArrayList<>(); + private final ConcurrentLinkedQueue buffer = new ConcurrentLinkedQueue<>(); + private final Set buffered = ConcurrentHashMap.newKeySet(); + + // If the pool stays empty for this many maintenance rounds, ask Tor for fresh circuits. + private static final int NEWNYM_AFTER_ROUNDS = 3; + private static final long NEWNYM_MIN_INTERVAL_MS = 15000; + private int emptyMaintainRounds = 0; + private long lastNewnymMs = 0; + // Exponential backoff so a struggling Tor isn't flooded with connect attempts. + private int backoffLevel = 0; + private int skipsRemaining = 0; + + /** + * Enqueue a transaction for anonymous relay over Tor and return immediately. Delivery happens + * asynchronously over the persistent tunnel pool. + * + * @return the number of relays the transaction will be pushed to (0 only if it cannot run) + */ + public int broadcastTransaction(Transaction transaction) { + ensureStarted(); + Sha256Hash txId = new TransactionCapsule(transaction).getTransactionId(); + if (buffered.add(txId)) { + buffer.add(transaction); + } + // Async: report the intended relay count so the caller sees success; the flusher delivers it. + return StrictMathWrapper.max(1, CommonParameter.getInstance().getTorBroadcastCount()); + } + + /** Stop the background maintenance/flush tasks and close all tunnels. */ + @PreDestroy + public void shutdown() { + ScheduledExecutorService svc = scheduler; + if (svc != null) { + svc.shutdownNow(); + } + connectExecutor.shutdownNow(); + for (RelayChannel c : channels) { + c.close(); + } + channels.clear(); + buffer.clear(); + buffered.clear(); + } + + private void ensureStarted() { + if (scheduler != null) { + return; + } + synchronized (this) { + if (scheduler != null) { + return; + } + ScheduledExecutorService svc = Executors.newScheduledThreadPool(2, r -> { + Thread t = new Thread(r, "tor-relay-pool"); + t.setDaemon(true); + return t; + }); + svc.scheduleWithFixedDelay(this::maintainChannels, 0, MAINTAIN_INTERVAL_MS, + TimeUnit.MILLISECONDS); + svc.scheduleWithFixedDelay(this::flush, FLUSH_INTERVAL_MS, FLUSH_INTERVAL_MS, + TimeUnit.MILLISECONDS); + scheduler = svc; + } + } + + // ---- Pool maintenance: keep POOL_SIZE persistent tunnels to at-head, non-sync relays ---------- + + private void maintainChannels() { + try { + CommonParameter parameter = CommonParameter.getInstance(); + if (!parameter.isTorBroadcastEnable()) { + return; + } + channels.removeIf(c -> { + if (!c.alive) { + c.close(); + return true; + } + return false; + }); + int need = POOL_SIZE - channels.size(); + if (need <= 0) { + emptyMaintainRounds = 0; + backoffLevel = 0; + skipsRemaining = 0; + return; + } + // Back off: after failed rounds, skip a growing number of ticks so a congested/down Tor is + // not flooded with connect attempts (which only clog its circuit-build queue further). + if (skipsRemaining > 0) { + skipsRemaining--; + return; + } + Set exclude = activeSyncPeerAddresses(); + for (RelayChannel c : channels) { + exclude.add(c.address); + } + List candidates = new ArrayList<>(); + for (Node node : discoveredNonSyncNodes(activeSyncPeerAddresses())) { + InetSocketAddress a = node.getPreferInetSocketAddress(); + if (a != null && a.getAddress() != null && !exclude.contains(a.getAddress())) { + candidates.add(node); + } + } + Collections.shuffle(candidates, ThreadLocalRandom.current()); + + // Launch only enough attempts to refill the missing tunnels (plus a small margin), and keep + // every one that succeeds — never build extra circuits just to close them. + int attempts = StrictMathWrapper.min(need + CONNECT_MARGIN, candidates.size()); + List> futures = new ArrayList<>(); + for (int i = 0; i < attempts; i++) { + Node node = candidates.get(i); + futures.add(connectExecutor.submit(() -> connectChannel(node, parameter))); + } + int added = 0; + for (Future f : futures) { + try { + RelayChannel c = f.get(parameter.getTorReadTimeout() + 3000L, TimeUnit.MILLISECONDS); + if (c != null) { + channels.add(c); + added++; + } + } catch (Exception ignored) { + // connect failed / timed out + } + } + if (added > 0) { + logger.info("Tor relay pool: {} persistent tunnel(s) open.", channels.size()); + } + if (added >= need) { + // pool refilled: clear the backoff + backoffLevel = 0; + skipsRemaining = 0; + emptyMaintainRounds = 0; + } else { + // couldn't refill: grow the backoff (skip 1, 3, 7, 15 ... of the next rounds) + backoffLevel = StrictMathWrapper.min(backoffLevel + 1, MAX_BACKOFF_LEVEL); + skipsRemaining = (1 << backoffLevel) - 1; + // If no tunnel exists at all for several attempts, Tor's circuits/exits may be throttled; + // ask Tor for a fresh identity (new exit IPs). This is our lever against a systemic outage. + if (channels.isEmpty() && ++emptyMaintainRounds >= NEWNYM_AFTER_ROUNDS) { + emptyMaintainRounds = 0; + signalNewnym(parameter); + } + } + } catch (Throwable t) { + // Never let the scheduled maintainer die: a thrown Error would stop it being rescheduled. + logger.warn("Tor relay pool maintenance failed: {}", t.toString()); + } + } + + // Ask the Tor daemon (via its ControlPort) to build fresh circuits with new exit relays, so a + // node whose current exit IPs are throttled can recover without a manual Tor restart. No-op if + // no ControlPort is configured. Rate-limited, as Tor itself throttles NEWNYM. + private void signalNewnym(CommonParameter parameter) { + int controlPort = parameter.getTorControlPort(); + if (controlPort <= 0) { + return; + } + long now = System.currentTimeMillis(); + if (now - lastNewnymMs < NEWNYM_MIN_INTERVAL_MS) { + return; + } + lastNewnymMs = now; + try (Socket socket = new Socket()) { + socket.connect(new InetSocketAddress(parameter.getTorSocksHost(), controlPort), + parameter.getTorConnectTimeout()); + socket.setSoTimeout(parameter.getTorReadTimeout()); + OutputStream out = socket.getOutputStream(); + InputStream in = socket.getInputStream(); + out.write(("AUTHENTICATE \"" + parameter.getTorControlPassword() + "\"\r\n") + .getBytes(StandardCharsets.US_ASCII)); + out.flush(); + String authReply = readControlLine(in); + if (authReply == null || !authReply.startsWith("250")) { + logger.warn("Tor ControlPort authentication failed: {}", authReply); + return; + } + out.write("SIGNAL NEWNYM\r\n".getBytes(StandardCharsets.US_ASCII)); + out.flush(); + String signalReply = readControlLine(in); + if (signalReply != null && signalReply.startsWith("250")) { + logger.info("Tor relay pool empty; signalled NEWNYM to rebuild circuits (new exit IPs)."); + } else { + logger.warn("Tor NEWNYM signal was rejected: {}", signalReply); + } + } catch (Exception e) { + logger.warn("Tor NEWNYM signal failed: {}", e.getMessage()); + } + } + + private static String readControlLine(InputStream in) throws IOException { + StringBuilder sb = new StringBuilder(); + int c; + while ((c = in.read()) != -1) { + if (c == '\n') { + break; + } + if (c != '\r') { + sb.append((char) c); + } + } + return sb.length() == 0 ? null : sb.toString(); + } + + private RelayChannel connectChannel(Node node, CommonParameter parameter) { + InetSocketAddress dest = node.getPreferInetSocketAddress(); + if (dest == null || dest.getAddress() == null) { + return null; + } + Socket socket = null; + try { + socket = openTunnel(dest, "relay-" + ThreadLocalRandom.current().nextLong(), parameter); + RelayChannel channel = new RelayChannel(dest.getAddress(), socket, parameter); + if (!channel.establish()) { + channel.close(); + return null; + } + return channel; + } catch (Exception e) { + closeQuietly(socket); + return null; + } + } + + // ---- Flush: deliver the buffered transactions as one batch over the tunnels ------------------- + + private void flush() { + try { + if (buffer.isEmpty()) { + return; + } + dropExpired(); + if (buffer.isEmpty()) { + return; + } + List alive = new ArrayList<>(); + for (RelayChannel c : channels) { + if (c.alive) { + alive.add(c); + } + } + if (alive.isEmpty()) { + return; // no tunnels yet; keep the transactions buffered until the pool is ready + } + List batch = new ArrayList<>(); + Transaction tx; + while (batch.size() < MAX_BATCH && (tx = buffer.poll()) != null) { + batch.add(tx); + } + if (batch.isEmpty()) { + return; + } + CommonParameter parameter = CommonParameter.getInstance(); + int target = StrictMathWrapper.max(1, + StrictMathWrapper.min(parameter.getTorBroadcastCount(), alive.size())); + List chosen = alive.subList(0, target); + // A socket write has no timeout of its own (setSoTimeout only affects reads), so a tunnel + // whose Tor circuit has died silently would block the advertise() call — and thus the whole + // flusher — forever. Run each advertise on the connect pool and bound it: if it stalls, close + // the tunnel (which unblocks the write) so the maintainer rebuilds it, and keep flushing. + long writeTimeout = StrictMathWrapper.max(2000L, parameter.getTorReadTimeout()); + List> futures = new ArrayList<>(); + for (RelayChannel channel : chosen) { + futures.add(connectExecutor.submit(() -> channel.advertise(batch))); + } + int delivered = 0; + for (int i = 0; i < futures.size(); i++) { + try { + if (Boolean.TRUE.equals(futures.get(i).get(writeTimeout, TimeUnit.MILLISECONDS))) { + delivered++; + } + } catch (Exception e) { + futures.get(i).cancel(true); + chosen.get(i).close(); // stalled/broken tunnel: drop it, the maintainer will replace it + } + } + if (delivered == 0) { + // every chosen tunnel failed to write; re-buffer for the next flush (no transaction lost) + for (Transaction t : batch) { + buffer.add(t); + } + } else { + for (Transaction t : batch) { + buffered.remove(new TransactionCapsule(t).getTransactionId()); + } + logger.info("Tor broadcast: flushed {} transaction(s) to {} relay tunnel(s).", + batch.size(), delivered); + } + } catch (Throwable t) { + // Never let the scheduled flusher die: a thrown Error would stop it being rescheduled. + logger.warn("Tor broadcast flush failed: {}", t.toString()); + } + } + + // Drop transactions whose expiration has passed: they can no longer be included even if relayed, + // so keeping them wastes tunnel writes and, during a long Tor outage, would grow the buffer + // without bound. Removing them here also frees the dedup set. + private void dropExpired() { + long now = System.currentTimeMillis(); + int dropped = 0; + Iterator it = buffer.iterator(); + while (it.hasNext()) { + Transaction tx = it.next(); + long expiration = tx.getRawData().getExpiration(); + if (expiration > 0 && expiration < now) { + it.remove(); + buffered.remove(new TransactionCapsule(tx).getTransactionId()); + dropped++; + } + } + if (dropped > 0) { + logger.warn("Tor broadcast: dropped {} expired transaction(s) not relayed before expiry.", + dropped); + } + } + + private List discoveredNonSyncNodes(Set syncPeers) { + List result = new ArrayList<>(); + for (Node node : tronNetService.getTableNodes()) { + InetSocketAddress address = node.getPreferInetSocketAddress(); + if (address != null && address.getAddress() != null + && !syncPeers.contains(address.getAddress())) { + result.add(node); + } + } + return result; + } + + private Set activeSyncPeerAddresses() { + Set set = new HashSet<>(); + for (PeerConnection peer : tronNetDelegate.getActivePeer()) { + InetAddress address = peer.getInetAddress(); + if (address != null) { + set.add(address); + } + } + return set; + } + + private long ourHeadNum() { + try { + return tronNetDelegate.getHeadBlockId().getNum(); + } catch (Exception e) { + return -1; // unknown (e.g. in tests) -> skip the at-head filter + } + } + + // ---- A persistent, handshaked Tor tunnel to one relay node ----------------------------------- + + private final class RelayChannel { + + private final InetAddress address; + private final Socket socket; + private final InputStream in; + private final OutputStream out; + private final CommonParameter parameter; + private int peerVersion; + private volatile boolean alive = true; + private final Map pendingFetch = new ConcurrentHashMap<>(); + private Thread reader; + + RelayChannel(InetAddress address, Socket socket, CommonParameter parameter) throws IOException { + this.address = address; + this.socket = socket; + this.parameter = parameter; + this.in = socket.getInputStream(); + this.out = socket.getOutputStream(); + } + + // Complete the transport + application handshake and start the keep-alive/serve reader. + boolean establish() throws Exception { + int networkId = parameter.getNodeP2pVersion(); + peerVersion = transportHandshake(in, out, networkId); + Protocol.HelloMessage peerAppHello = readAppHello(in, peerVersion); + if (peerAppHello == null) { + return false; + } + long ourHead = ourHeadNum(); + if (ourHead >= 0 && peerAppHello.getHeadBlockId().getNumber() < ourHead - HEAD_TOLERANCE) { + return false; // lagging relay, poor propagator + } + // Echo the peer's genesis/solid/head so it marks the connection sync-complete (needed before + // it will fetch a transaction inventory from us). + org.tron.protos.Discover.Endpoint appFrom = org.tron.protos.Discover.Endpoint.newBuilder() + .setNodeId(ByteString.copyFrom(NetUtil.getNodeId())) + .setAddress(ByteString.copyFrom(randomAnonIp().getBytes(StandardCharsets.UTF_8))) + .setPort(18888) + .build(); + Protocol.HelloMessage appHello = Protocol.HelloMessage.newBuilder() + .setFrom(appFrom) + .setVersion(networkId) + .setTimestamp(System.currentTimeMillis()) + .setGenesisBlockId(peerAppHello.getGenesisBlockId()) + .setSolidBlockId(peerAppHello.getSolidBlockId()) + .setHeadBlockId(peerAppHello.getHeadBlockId()) + .setNodeType(0) + .setLowestBlockNum(0) + .build(); + sendApp(prependType(MessageTypes.P2P_HELLO.asByte(), appHello.toByteArray())); + // Handshake done. The peer keep-alive-pings only every KEEP_ALIVE_TIMEOUT (20s), so relax the + // read timeout well above that (plus Tor jitter); otherwise an idle but healthy tunnel would + // time out between pings and be dropped, churning the pool and flooding Tor with reconnects. + int minIdle = 3 * Parameter.KEEP_ALIVE_TIMEOUT; + socket.setSoTimeout(StrictMathWrapper.max(parameter.getTorReadTimeout(), minIdle)); + reader = new Thread(this::readLoop, "tor-relay-reader"); + reader.setDaemon(true); + reader.start(); + return true; + } + + private void readLoop() { + try { + while (alive) { + byte[] decoded = UpgradeController.decodeReceiveData(peerVersion, readFrame(in)); + if (decoded.length == 0) { + continue; + } + byte type = decoded[0]; + if (type == PING_TYPE) { + sendApp(new PongMessage().getSendData()); // keep the tunnel alive + } else if (type == MessageTypes.FETCH_INV_DATA.asByte()) { + FetchInvDataMessage fetch = + new FetchInvDataMessage(subarray(decoded, 1, decoded.length)); + List serve = new ArrayList<>(); + for (Sha256Hash hash : fetch.getHashList()) { + Transaction t = pendingFetch.remove(hash); + if (t != null) { + serve.add(t); + } + } + if (!serve.isEmpty()) { + sendApp(new TransactionsMessage(serve).getSendBytes()); + } + } + // ignore everything else (the peer's own INV, status, sync requests, ...) + } + } catch (Exception e) { + // connection closed or broken; the maintainer will rebuild it + } finally { + alive = false; + closeQuietly(socket); + } + } + + // Advertise a batch of transactions; the reader serves them when the peer sends FetchInvData. + boolean advertise(List batch) { + try { + List ids = new ArrayList<>(batch.size()); + for (Transaction t : batch) { + Sha256Hash id = new TransactionCapsule(t).getTransactionId(); + pendingFetch.put(id, t); + ids.add(id); + } + sendApp(new InventoryMessage(ids, InventoryType.TRX).getSendBytes()); + return true; + } catch (Exception e) { + alive = false; + return false; + } + } + + // Serialize writes: the reader thread (pongs, fetch replies) and the flusher (INV) share out. + private synchronized void sendApp(byte[] appMessageBytes) throws IOException { + writeFrame(out, UpgradeController.codeSendData(peerVersion, appMessageBytes)); + } + + void close() { + alive = false; + closeQuietly(socket); + } + } + + // ---- Tor SOCKS5 tunnel + native TRON P2P handshake helpers ----------------------------------- + + private Socket openTunnel(InetSocketAddress dest, String isoUser, CommonParameter parameter) + throws IOException { + Socket socket = new Socket(); + socket.connect( + new InetSocketAddress(parameter.getTorSocksHost(), parameter.getTorSocksPort()), + parameter.getTorConnectTimeout()); + socket.setSoTimeout(parameter.getTorReadTimeout()); + socksHandshake(socket.getInputStream(), socket.getOutputStream(), dest, + parameter.isTorCircuitIsolation(), isoUser); + return socket; + } + + private void socksHandshake(InputStream in, OutputStream out, InetSocketAddress dest, + boolean isolation, String isoUser) throws IOException { + if (isolation) { + out.write(new byte[] {SOCKS_VERSION, 1, SOCKS_AUTH_USERPASS}); + } else { + out.write(new byte[] {SOCKS_VERSION, 1, SOCKS_AUTH_NONE}); + } + out.flush(); + byte[] methodReply = readFully(in, 2); + int method = methodReply[1] & 0xff; + + if (isolation) { + if (method != SOCKS_AUTH_USERPASS) { + throw new IOException("SOCKS proxy rejected user/pass auth (circuit isolation)"); + } + byte[] user = isoUser.getBytes(StandardCharsets.US_ASCII); + byte[] pass = {0}; + out.write(0x01); + out.write(user.length); + out.write(user); + out.write(pass.length); + out.write(pass); + out.flush(); + byte[] authReply = readFully(in, 2); + if ((authReply[1] & 0xff) != 0x00) { + throw new IOException("SOCKS proxy auth failed"); + } + } else if (method != SOCKS_AUTH_NONE) { + throw new IOException("SOCKS proxy requires authentication"); + } + + byte[] addr = dest.getAddress().getAddress(); + int atyp = addr.length == 4 ? 0x01 : 0x04; + int port = dest.getPort(); + out.write(SOCKS_VERSION); + out.write(SOCKS_CMD_CONNECT); + out.write(0x00); + out.write(atyp); + out.write(addr); + out.write((port >> 8) & 0xff); + out.write(port & 0xff); + out.flush(); + + byte[] head = readFully(in, 4); + if ((head[1] & 0xff) != 0x00) { + throw new IOException("SOCKS connect to " + dest + " failed, code=" + (head[1] & 0xff)); + } + int boundLen = (head[3] & 0xff) == 0x01 ? 4 : (head[3] & 0xff) == 0x04 ? 16 : 0; + readFully(in, boundLen + 2); // skip BND.ADDR + BND.PORT + } + + // libp2p transport Hello handshake; returns the peer's protocol version. Advertises a random, + // non-identifying public IP (never our real one) so tunnels can't be clustered by origin address. + private int transportHandshake(InputStream in, OutputStream out, int networkId) throws Exception { + org.tron.p2p.protos.Discover.Endpoint from = org.tron.p2p.protos.Discover.Endpoint.newBuilder() + .setNodeId(ByteString.copyFrom(NetUtil.getNodeId())) + .setAddress(ByteString.copyFrom(randomAnonIp().getBytes(StandardCharsets.UTF_8))) + .setPort(18888) + .build(); + Connect.HelloMessage hello = Connect.HelloMessage.newBuilder() + .setFrom(from) + .setNetworkId(networkId) + .setCode(0) + .setVersion(Parameter.version) + .setTimestamp(System.currentTimeMillis()) + .build(); + writeFrame(out, prependType(HELLO_TYPE, hello.toByteArray())); + + byte[] peerFrame = readFrame(in); + if (peerFrame.length == 0 || peerFrame[0] != HELLO_TYPE) { + throw new IOException("expected Hello from peer"); + } + Connect.HelloMessage peerHello = + Connect.HelloMessage.parseFrom(subarray(peerFrame, 1, peerFrame.length)); + if (peerHello.getCode() != 0 || peerHello.getNetworkId() != networkId) { + throw new IOException("peer rejected handshake (code=" + peerHello.getCode() + + ", networkId=" + peerHello.getNetworkId() + ")"); + } + return peerHello.getVersion(); + } + + // Read frames until the peer's application-level P2P_HELLO arrives, returning it (or null). + private Protocol.HelloMessage readAppHello(InputStream in, int peerVersion) throws Exception { + long deadline = System.currentTimeMillis() + Parameter.KEEP_ALIVE_TIMEOUT; + while (System.currentTimeMillis() < deadline) { + byte[] decoded = UpgradeController.decodeReceiveData(peerVersion, readFrame(in)); + if (decoded.length > 0 && decoded[0] == MessageTypes.P2P_HELLO.asByte()) { + return Protocol.HelloMessage.parseFrom(subarray(decoded, 1, decoded.length)); + } + } + return null; + } + + // ---- framing helpers (protobuf varint32 length prefix, matching libp2p's framing) ------------ + + private static void writeFrame(OutputStream out, byte[] payload) throws IOException { + writeRawVarint32(out, payload.length); + out.write(payload); + out.flush(); + } + + private static byte[] readFrame(InputStream in) throws IOException { + int len = readRawVarint32(in); + if (len < 0 || len > Parameter.MAX_MESSAGE_LENGTH) { + throw new IOException("invalid frame length: " + len); + } + return readFully(in, len); + } + + private static void writeRawVarint32(OutputStream out, int value) throws IOException { + while (true) { + if ((value & ~0x7f) == 0) { + out.write(value); + return; + } + out.write((value & 0x7f) | 0x80); + value >>>= 7; + } + } + + private static int readRawVarint32(InputStream in) throws IOException { + int result = 0; + int shift = 0; + while (shift < 32) { + int b = in.read(); + if (b == -1) { + throw new EOFException("stream closed while reading frame length"); + } + result |= (b & 0x7f) << shift; + if ((b & 0x80) == 0) { + return result; + } + shift += 7; + } + throw new IOException("varint32 too long"); + } + + private static byte[] readFully(InputStream in, int len) throws IOException { + byte[] buf = new byte[len]; + int off = 0; + while (off < len) { + int n = in.read(buf, off, len - off); + if (n == -1) { + throw new EOFException("stream closed while reading " + len + " bytes"); + } + off += n; + } + return buf; + } + + private static byte[] prependType(byte type, byte[] data) { + byte[] out = new byte[data.length + 1]; + out[0] = type; + System.arraycopy(data, 0, out, 1, data.length); + return out; + } + + private static byte[] subarray(byte[] data, int from, int to) { + byte[] out = new byte[to - from]; + System.arraycopy(data, from, out, 0, to - from); + return out; + } + + // A random, well-formed, non-multicast public IPv4 (never our real IP). Passes the peer's + // NetUtil.validNode check and, being fresh per tunnel, is not a stable clustering handle. + private static String randomAnonIp() { + ThreadLocalRandom r = ThreadLocalRandom.current(); + while (true) { + int a = r.nextInt(1, 224); // 1..223: skip 0.x and multicast/reserved 224+ + int b = r.nextInt(0, 256); + int c = r.nextInt(0, 256); + int d = r.nextInt(1, 255); // 1..254: skip .0 network and .255 broadcast + if (isPublicIpv4(a, b, c, d)) { + return a + "." + b + "." + c + "." + d; + } + } + } + + // True only for a globally-routable public IPv4 (excludes private, loopback, link-local, CGNAT, + // documentation, benchmarking and other reserved ranges). We advertise such an address so peers + // accept it (NetUtil.validNode) while it still reveals nothing about us. + private static boolean isPublicIpv4(int a, int b, int c, int d) { + if (a == 10 || a == 127) { + return false; // private 10/8, loopback 127/8 + } + if (a == 100 && b >= 64 && b <= 127) { + return false; // CGNAT 100.64/10 + } + if (a == 169 && b == 254) { + return false; // link-local 169.254/16 + } + if (a == 172 && b >= 16 && b <= 31) { + return false; // private 172.16/12 + } + if (a == 192 && (b == 0 || b == 88 || b == 168)) { + return false; // 192.0.0/24, 192.0.2/24, 192.88.99/24 (6to4), 192.168/16 + } + if (a == 198 && (b == 18 || b == 19 || b == 51)) { + return false; // benchmarking 198.18/15, doc 198.51.100/24 + } + if (a == 203 && b == 0 && c == 113) { + return false; // documentation 203.0.113/24 + } + return true; + } + + private static void closeQuietly(Socket socket) { + if (socket != null) { + try { + socket.close(); + } catch (IOException ignored) { + // ignore + } + } + } +} diff --git a/framework/src/main/resources/config.conf b/framework/src/main/resources/config.conf index d00f334f4ce..1aac2114e05 100644 --- a/framework/src/main/resources/config.conf +++ b/framework/src/main/resources/config.conf @@ -198,6 +198,27 @@ node { isOpenFullTcpDisconnect = false inactiveThreshold = 600 //seconds + # Anonymous outbound transaction broadcast over Tor. + # When enabled, transactions submitted to THIS node (via its own broadcast API) are validated + # locally and then, instead of being advertised to directly-connected P2P peers (which exposes + # this node's IP as the origin), are pushed via the native P2P protocol (Hello -> INV -> fetch + # -> Transaction) through the local Tor SOCKS5 proxy to a random subset of nodes learned via the + # normal (UDP) discovery layer that are NOT currently used as sync peers. Those nodes are stock, + # unmodified TRON nodes and re-gossip the transaction as usual. Block sync and normal P2P + # traffic are unaffected. The feature is off by default. + tor { + enabled = false + socksHost = "127.0.0.1" + socksPort = 9050 # default Tor SOCKS5 port + connectTimeout = 30000 # ms; Tor circuits are slow + readTimeout = 30000 # ms + broadcastCount = 2 # number of discovered non-sync nodes to push each tx to + circuitIsolation = true # use a distinct Tor circuit per connection (SOCKS auth isolation) + controlPort = 0 # Tor ControlPort; if > 0, the node signals NEWNYM to rebuild + # circuits (fresh exit IPs) when no relay tunnel can be established + controlPassword = "" # password for the Tor ControlPort (HashedControlPassword in torrc) + } + p2p { version = 11111 # Mainnet:11111; Nile:201910292; Shasta:1 } diff --git a/framework/src/test/java/org/tron/core/WalletMockTest.java b/framework/src/test/java/org/tron/core/WalletMockTest.java index 3e0c1a4461d..7b303089dc6 100644 --- a/framework/src/test/java/org/tron/core/WalletMockTest.java +++ b/framework/src/test/java/org/tron/core/WalletMockTest.java @@ -256,6 +256,97 @@ public void testBroadcastTransactionTooManyPending() throws Exception { assertEquals(GrpcAPI.Return.response_code.SERVER_BUSY, ret.getCode()); } + @Test + public void testBroadcastTransactionTorDisabledUsesP2p() throws Exception { + CommonParameter.getInstance().setTorBroadcastEnable(false); + org.tron.core.net.TronNetService netSvc = + mock(org.tron.core.net.TronNetService.class); + org.tron.core.net.service.tor.TorBroadcastService torSvc = + mock(org.tron.core.net.service.tor.TorBroadcastService.class); + when(netSvc.fastBroadcastTransaction(any())).thenReturn(2); + Wallet wallet = routingWallet(netSvc, torSvc); + + GrpcAPI.Return ret = wallet.broadcastTransaction(routingTransaction()); + + assertEquals(GrpcAPI.Return.response_code.SUCCESS, ret.getCode()); + Mockito.verify(netSvc, Mockito.times(1)).fastBroadcastTransaction(any()); + Mockito.verify(torSvc, Mockito.never()).broadcastTransaction(any()); + } + + @Test + public void testBroadcastTransactionTorEnabledUsesTor() throws Exception { + CommonParameter.getInstance().setTorBroadcastEnable(true); + try { + org.tron.core.net.TronNetService netSvc = + mock(org.tron.core.net.TronNetService.class); + org.tron.core.net.service.tor.TorBroadcastService torSvc = + mock(org.tron.core.net.service.tor.TorBroadcastService.class); + when(torSvc.broadcastTransaction(any())).thenReturn(2); + Wallet wallet = routingWallet(netSvc, torSvc); + + GrpcAPI.Return ret = wallet.broadcastTransaction(routingTransaction()); + + assertEquals(GrpcAPI.Return.response_code.SUCCESS, ret.getCode()); + Mockito.verify(torSvc, Mockito.times(1)).broadcastTransaction(any()); + Mockito.verify(netSvc, Mockito.never()).fastBroadcastTransaction(any()); + } finally { + CommonParameter.getInstance().setTorBroadcastEnable(false); + } + } + + // Wires a Wallet whose pre-broadcast checks all pass, so broadcastTransaction reaches the + // Tor-vs-P2P routing branch; the two broadcast services are mocks the caller stubs and verifies. + private Wallet routingWallet(org.tron.core.net.TronNetService netSvc, + org.tron.core.net.service.tor.TorBroadcastService torSvc) throws Exception { + Wallet wallet = new Wallet(); + TronNetDelegate netDelegate = mock(TronNetDelegate.class); + when(netDelegate.isBlockUnsolidified()).thenReturn(false); + Manager dbManager = mock(Manager.class); + when(dbManager.isTooManyPending()).thenReturn(false); + org.tron.core.ChainBaseManager cbm = mock(org.tron.core.ChainBaseManager.class); + DynamicPropertiesStore dps = mock(DynamicPropertiesStore.class); + when(cbm.getDynamicPropertiesStore()).thenReturn(dps); + when(dps.supportVM()).thenReturn(false); + when(dps.getAllowProtoFilterNum()).thenReturn(0L); + when(cbm.getNextBlockSlotTime()).thenReturn(0L); + // The P2P path builds a TransactionMessage, which validates the contract against the static + // DynamicPropertiesStore held by Message; provide it so the message can be constructed. + Field messageStore = org.tron.common.overlay.message.Message.class + .getDeclaredField("dynamicPropertiesStore"); + messageStore.setAccessible(true); + messageStore.set(null, dps); + setWalletField(wallet, "tronNetDelegate", netDelegate); + setWalletField(wallet, "dbManager", dbManager); + setWalletField(wallet, "chainBaseManager", cbm); + setWalletField(wallet, "tronNetService", netSvc); + setWalletField(wallet, "torBroadcastService", torSvc); + setWalletField(wallet, "minEffectiveConnection", 0); + setWalletField(wallet, "trxCacheEnable", false); + return wallet; + } + + private static Protocol.Transaction routingTransaction() { + byte[] addr = new byte[21]; + addr[0] = 0x41; + BalanceContract.TransferContract transfer = BalanceContract.TransferContract.newBuilder() + .setOwnerAddress(ByteString.copyFrom(addr)) + .setToAddress(ByteString.copyFrom(addr)) + .setAmount(1L) + .build(); + return Protocol.Transaction.newBuilder() + .setRawData(Protocol.Transaction.raw.newBuilder() + .addContract(Protocol.Transaction.Contract.newBuilder() + .setType(Protocol.Transaction.Contract.ContractType.TransferContract) + .setParameter(Any.pack(transfer)))) + .build(); + } + + private static void setWalletField(Wallet wallet, String name, Object value) throws Exception { + Field field = Wallet.class.getDeclaredField(name); + field.setAccessible(true); + field.set(wallet, value); + } + @Test public void testBroadcastTransactionAlreadyExists() throws Exception { Wallet wallet = new Wallet(); diff --git a/framework/src/test/java/org/tron/core/net/service/tor/TorBroadcastServiceTest.java b/framework/src/test/java/org/tron/core/net/service/tor/TorBroadcastServiceTest.java new file mode 100644 index 00000000000..0bb5c7cd5d7 --- /dev/null +++ b/framework/src/test/java/org/tron/core/net/service/tor/TorBroadcastServiceTest.java @@ -0,0 +1,566 @@ +package org.tron.core.net.service.tor; + +import com.google.protobuf.ByteString; +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import org.springframework.test.util.ReflectionTestUtils; +import org.tron.common.TestConstants; +import org.tron.common.parameter.CommonParameter; +import org.tron.core.config.args.Args; +import org.tron.core.net.TronNetDelegate; +import org.tron.core.net.TronNetService; +import org.tron.core.net.message.MessageTypes; +import org.tron.core.net.message.adv.FetchInvDataMessage; +import org.tron.core.net.message.adv.InventoryMessage; +import org.tron.p2p.base.Parameter; +import org.tron.p2p.connection.business.upgrade.UpgradeController; +import org.tron.p2p.discover.Node; +import org.tron.p2p.protos.Connect; +import org.tron.p2p.utils.NetUtil; +import org.tron.protos.Protocol; +import org.tron.protos.Protocol.Inventory.InventoryType; +import org.tron.protos.Protocol.Transaction; + +/** + * End-to-end test for {@link TorBroadcastService}: an enqueued transaction must reach a standard + * TRON node over a persistent tunnel using the native P2P protocol (Hello -> P2P_HELLO -> INV + * -> FetchInvData -> TransactionsMessage), tunnelled through a SOCKS5 proxy. A minimal + * in-process SOCKS5 forwarder stands in for Tor, and a stub node speaks just enough of the wire + * protocol to accept the transaction — so the whole routing + framing + compression path runs + * without any external dependency. Delivery is asynchronous (buffer + background flush), so the + * test waits for the stub to receive the transaction. + */ +public class TorBroadcastServiceTest { + + private static final int NETWORK_ID = 11111; + private static final byte HELLO = (byte) 0xfd; + + private StubNode stubNode; + private Socks5Forwarder socksProxy; + private TorBroadcastService service; + private final AtomicBoolean transactionReceived = new AtomicBoolean(false); + + @BeforeClass + public static void initArgs() { + // Initialise CommonParameter as the other net tests do (also sets up global state this test's + // service touches). PeerConnection's static relayNodes is safe regardless of ordering now that + // CommonParameter.fastForwardNodes defaults to an empty list. + Args.setParam(new String[] {}, TestConstants.TEST_CONF); + } + + @Before + public void setUp() throws IOException { + if (Parameter.p2pConfig == null) { + Parameter.p2pConfig = new org.tron.p2p.P2pConfig(); + } + stubNode = new StubNode(transactionReceived); + stubNode.start(); + socksProxy = new Socks5Forwarder(); + socksProxy.start(); + + CommonParameter parameter = CommonParameter.getInstance(); + parameter.setTorSocksHost("127.0.0.1"); + parameter.setTorSocksPort(socksProxy.getPort()); + parameter.setTorConnectTimeout(5000); + parameter.setTorReadTimeout(5000); + parameter.setTorBroadcastEnable(true); + parameter.setTorCircuitIsolation(false); // keep the in-process forwarder auth-free + parameter.setTorBroadcastCount(1); + parameter.setTorControlPort(0); + parameter.setNodeP2pVersion(NETWORK_ID); + } + + @After + public void tearDown() { + if (service != null) { + service.shutdown(); // stop the background threads so they don't leak into other tests + service = null; + } + if (stubNode != null) { + stubNode.close(); + } + if (socksProxy != null) { + socksProxy.close(); + } + } + + @Test + public void transactionDeliveredOverPersistentTunnel() throws Exception { + Node stub = new Node(new InetSocketAddress("127.0.0.1", stubNode.getPort())); + newService(Collections.singletonList(stub)); + + service.broadcastTransaction(sampleTransaction()); + + // Delivery is async (pool maintenance + flush run in the background); wait for the stub. + long deadline = System.currentTimeMillis() + 15000; + while (System.currentTimeMillis() < deadline && !transactionReceived.get()) { + Thread.sleep(100); + } + Assert.assertTrue("stub node never received the transaction over the persistent tunnel", + transactionReceived.get()); + } + + @Test + public void noRelaysNothingDelivered() throws Exception { + newService(Collections.emptyList()); + + service.broadcastTransaction(sampleTransaction()); + + Thread.sleep(2000); + Assert.assertFalse("nothing should be delivered when there are no relay nodes", + transactionReceived.get()); + } + + @Test + public void torProxyOutageThenRecovery() throws Exception { + // Point the SOCKS proxy at a closed port: the Tor daemon is "down". + CommonParameter.getInstance().setTorSocksPort(freePort()); + Node stub = new Node(new InetSocketAddress("127.0.0.1", stubNode.getPort())); + newService(Collections.singletonList(stub)); + + service.broadcastTransaction(sampleTransaction()); + Thread.sleep(2500); + Assert.assertFalse("nothing should be delivered while Tor is down", + transactionReceived.get()); + + // Tor comes back: point the proxy at the working forwarder. No transaction is lost — the + // buffered one must be delivered once tunnels can be built again. + CommonParameter.getInstance().setTorSocksPort(socksProxy.getPort()); + waitUntil(transactionReceived, 30000); + Assert.assertTrue("buffered transaction must be delivered once Tor recovers", + transactionReceived.get()); + } + + @Test + public void relayDegradedThenRecovers() throws Exception { + stubNode.healthy = false; // the only relay drops us during the handshake + Node stub = new Node(new InetSocketAddress("127.0.0.1", stubNode.getPort())); + newService(Collections.singletonList(stub)); + + service.broadcastTransaction(sampleTransaction()); + Thread.sleep(2500); + Assert.assertFalse("nothing should be delivered while the only relay is degraded", + transactionReceived.get()); + + // Relay recovers: the pool must rebuild a tunnel and deliver the buffered transaction. + stubNode.healthy = true; + waitUntil(transactionReceived, 30000); + Assert.assertTrue("buffered transaction must be delivered once the relay recovers", + transactionReceived.get()); + } + + @Test + public void newnymSignalledWhenPoolCannotForm() throws Exception { + TorControlServer control = new TorControlServer(); + control.start(); + CommonParameter.getInstance().setTorControlPort(control.getPort()); + CommonParameter.getInstance().setTorControlPassword(""); + try { + // No relay nodes -> the pool can never form -> after a few empty rounds the node should ask + // Tor for fresh circuits (new exit IPs) via the control port. + newService(Collections.emptyList()); + service.broadcastTransaction(sampleTransaction()); + waitUntil(control.newnymReceived, 40000); + Assert.assertTrue("node should signal NEWNYM when no relay tunnel can be built", + control.newnymReceived.get()); + } finally { + CommonParameter.getInstance().setTorControlPort(0); + control.close(); + } + } + + private static void waitUntil(AtomicBoolean flag, long timeoutMs) throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMs; + while (System.currentTimeMillis() < deadline && !flag.get()) { + Thread.sleep(100); + } + } + + private static int freePort() throws IOException { + try (ServerSocket s = new ServerSocket()) { + s.bind(new InetSocketAddress("127.0.0.1", 0)); + return s.getLocalPort(); + } + } + + private TorBroadcastService newService(java.util.List tableNodes) { + TorBroadcastService svc = new TorBroadcastService(); + TronNetService netService = Mockito.mock(TronNetService.class); + TronNetDelegate netDelegate = Mockito.mock(TronNetDelegate.class); + Mockito.when(netService.getTableNodes()).thenReturn(tableNodes); + Mockito.when(netDelegate.getActivePeer()).thenReturn(Collections.emptyList()); + ReflectionTestUtils.setField(svc, "tronNetService", netService); + ReflectionTestUtils.setField(svc, "tronNetDelegate", netDelegate); + this.service = svc; // tracked so tearDown can shut it down + return svc; + } + + private static Transaction sampleTransaction() { + return Transaction.newBuilder() + .setRawData(Transaction.raw.newBuilder().setTimestamp(1L)) + .build(); + } + + // ---- framing helpers (mirror the service / libp2p wire format) ------------------------------ + + private static void writeFrame(OutputStream out, byte[] payload) throws IOException { + int value = payload.length; + while (true) { + if ((value & ~0x7f) == 0) { + out.write(value); + break; + } + out.write((value & 0x7f) | 0x80); + value >>>= 7; + } + out.write(payload); + out.flush(); + } + + private static byte[] readFrame(InputStream in) throws IOException { + int result = 0; + int shift = 0; + while (shift < 32) { + int b = in.read(); + if (b == -1) { + throw new IOException("closed"); + } + result |= (b & 0x7f) << shift; + if ((b & 0x80) == 0) { + break; + } + shift += 7; + } + byte[] buf = new byte[result]; + int off = 0; + while (off < result) { + int n = in.read(buf, off, result - off); + if (n == -1) { + throw new IOException("closed"); + } + off += n; + } + return buf; + } + + private static byte[] prepend(byte type, byte[] data) { + byte[] out = new byte[data.length + 1]; + out[0] = type; + System.arraycopy(data, 0, out, 1, data.length); + return out; + } + + private static byte[] tail(byte[] data) { + byte[] out = new byte[data.length - 1]; + System.arraycopy(data, 1, out, 0, out.length); + return out; + } + + /** A stub standard TRON node: handshakes, keeps the tunnel open, records receipt of the tx. */ + private static final class StubNode implements Closeable { + + private final ServerSocket serverSocket; + private final AtomicBoolean received; + // When false the relay is "degraded": it drops the connection during the handshake. + volatile boolean healthy = true; + + StubNode(AtomicBoolean received) throws IOException { + this.received = received; + this.serverSocket = new ServerSocket(); + this.serverSocket.bind(new InetSocketAddress("127.0.0.1", 0)); + } + + int getPort() { + return serverSocket.getLocalPort(); + } + + void start() { + Thread t = new Thread(() -> { + while (!serverSocket.isClosed()) { + try { + Socket client = serverSocket.accept(); + Thread worker = new Thread(() -> { + try { + serve(client); + } catch (Exception ignored) { + // connection closed on teardown + } finally { + try { + client.close(); + } catch (IOException ignored) { + // ignore + } + } + }); + worker.setDaemon(true); + worker.start(); + } catch (Exception e) { + return; // server socket closed on teardown + } + } + }, "stub-tron-node"); + t.setDaemon(true); + t.start(); + } + + private void serve(Socket client) throws Exception { + InputStream in = client.getInputStream(); + OutputStream out = client.getOutputStream(); + + // Transport Hello (uncompressed); reject if the client's "from" endpoint is invalid. + Connect.HelloMessage clientHello = Connect.HelloMessage.parseFrom(tail(readFrame(in))); + if (!NetUtil.validNode(NetUtil.getNode(clientHello.getFrom()))) { + return; + } + if (!healthy) { + return; // degraded relay: drop the connection mid-handshake + } + Connect.HelloMessage hello = Connect.HelloMessage.newBuilder() + .setFrom(Connect.HelloMessage.getDefaultInstance().getFrom()) + .setNetworkId(NETWORK_ID).setCode(0).setVersion(1) + .setTimestamp(System.currentTimeMillis()).build(); + // rebuild with a valid endpoint + org.tron.p2p.protos.Discover.Endpoint ep = org.tron.p2p.protos.Discover.Endpoint.newBuilder() + .setNodeId(ByteString.copyFrom(NetUtil.getNodeId())) + .setAddress(ByteString.copyFrom("203.0.113.7".getBytes())).setPort(18888).build(); + hello = hello.toBuilder().setFrom(ep).build(); + writeFrame(out, prepend(HELLO, hello.toByteArray())); + + // Application-level P2P_HELLO advertising a head block (compressed, post-handshake). + Protocol.HelloMessage.BlockId block = Protocol.HelloMessage.BlockId.newBuilder() + .setHash(ByteString.copyFrom(new byte[32])).setNumber(1L).build(); + org.tron.protos.Discover.Endpoint appFrom = org.tron.protos.Discover.Endpoint.newBuilder() + .setNodeId(ByteString.copyFrom(NetUtil.getNodeId())) + .setAddress(ByteString.copyFrom("203.0.113.7".getBytes())).setPort(18888).build(); + Protocol.HelloMessage appHello = Protocol.HelloMessage.newBuilder() + .setFrom(appFrom).setVersion(NETWORK_ID).setTimestamp(System.currentTimeMillis()) + .setGenesisBlockId(block).setSolidBlockId(block).setHeadBlockId(block) + .setNodeType(0).setLowestBlockNum(0).build(); + writeFrame(out, UpgradeController.codeSendData(1, + prepend(MessageTypes.P2P_HELLO.asByte(), appHello.toByteArray()))); + + // Read frames until the client advertises the tx (INV), then request it and read the batch. + long deadline = System.currentTimeMillis() + 15000; + while (System.currentTimeMillis() < deadline) { + byte[] frame = UpgradeController.decodeReceiveData(1, readFrame(in)); + if (frame.length == 0) { + continue; + } + if (frame[0] == MessageTypes.INVENTORY.asByte()) { + InventoryMessage invMessage = new InventoryMessage(tail(frame)); + FetchInvDataMessage fetch = + new FetchInvDataMessage(invMessage.getHashList(), InventoryType.TRX); + writeFrame(out, UpgradeController.codeSendData(1, fetch.getSendBytes())); + } else if (frame[0] == MessageTypes.TRXS.asByte()) { + received.set(true); + return; + } + } + } + + @Override + public void close() { + try { + serverSocket.close(); + } catch (IOException ignored) { + // ignore + } + } + } + + /** Minimal Tor ControlPort stub: accepts AUTHENTICATE then records a SIGNAL NEWNYM. */ + private static final class TorControlServer implements Closeable { + + private final ServerSocket serverSocket; + final AtomicBoolean newnymReceived = new AtomicBoolean(false); + + TorControlServer() throws IOException { + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("127.0.0.1", 0)); + } + + int getPort() { + return serverSocket.getLocalPort(); + } + + void start() { + Thread t = new Thread(() -> { + while (!serverSocket.isClosed()) { + try { + Socket client = serverSocket.accept(); + Thread worker = new Thread(() -> handle(client)); + worker.setDaemon(true); + worker.start(); + } catch (IOException e) { + return; + } + } + }, "test-tor-control"); + t.setDaemon(true); + t.start(); + } + + private void handle(Socket client) { + try { + BufferedReader in = new BufferedReader(new InputStreamReader( + client.getInputStream(), StandardCharsets.US_ASCII)); + OutputStream out = client.getOutputStream(); + String line; + while ((line = in.readLine()) != null) { + if (line.startsWith("AUTHENTICATE") || line.startsWith("SIGNAL NEWNYM")) { + if (line.startsWith("SIGNAL NEWNYM")) { + newnymReceived.set(true); + } + out.write("250 OK\r\n".getBytes(StandardCharsets.US_ASCII)); + out.flush(); + } + } + } catch (IOException ignored) { + // connection closed + } + } + + @Override + public void close() { + try { + serverSocket.close(); + } catch (IOException ignored) { + // ignore + } + } + } + + /** Minimal no-auth SOCKS5 forwarder: CONNECT, then blindly pipe bytes both ways. */ + private static final class Socks5Forwarder implements Closeable { + + private final ServerSocket serverSocket; + + Socks5Forwarder() throws IOException { + serverSocket = new ServerSocket(); + serverSocket.bind(new InetSocketAddress("127.0.0.1", 0)); + } + + int getPort() { + return serverSocket.getLocalPort(); + } + + void start() { + Thread t = new Thread(() -> { + while (!serverSocket.isClosed()) { + try { + Socket client = serverSocket.accept(); + Thread worker = new Thread(() -> handle(client)); + worker.setDaemon(true); + worker.start(); + } catch (IOException e) { + return; + } + } + }, "test-socks5"); + t.setDaemon(true); + t.start(); + } + + private void handle(Socket client) { + try { + InputStream in = client.getInputStream(); + OutputStream out = client.getOutputStream(); + readNBytes(in, 1); // version + int nMethods = in.read(); + readNBytes(in, nMethods); + out.write(new byte[] {0x05, 0x00}); + out.flush(); + + readNBytes(in, 3); // ver, cmd, rsv + int atyp = in.read(); + byte[] addr; + if (atyp == 0x01) { + addr = readNBytes(in, 4); + } else if (atyp == 0x03) { + addr = readNBytes(in, in.read()); + } else { + addr = readNBytes(in, 16); + } + int port = ((in.read() & 0xff) << 8) | (in.read() & 0xff); + String host = atyp == 0x03 ? new String(addr, java.nio.charset.StandardCharsets.US_ASCII) + : java.net.InetAddress.getByAddress(addr).getHostAddress(); + + Socket target = new Socket(host, port); + out.write(new byte[] {0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0}); + out.flush(); + pipe(client, target); + pipe(target, client); + } catch (IOException e) { + closeQuietly(client); + } + } + + private void pipe(Socket from, Socket to) { + Thread t = new Thread(() -> { + byte[] buf = new byte[4096]; + try { + InputStream src = from.getInputStream(); + OutputStream dst = to.getOutputStream(); + int n; + while ((n = src.read(buf)) != -1) { + dst.write(buf, 0, n); + dst.flush(); + } + } catch (IOException ignored) { + // closed + } finally { + closeQuietly(from); + closeQuietly(to); + } + }); + t.setDaemon(true); + t.start(); + } + + private static byte[] readNBytes(InputStream in, int n) throws IOException { + byte[] buf = new byte[n]; + int off = 0; + while (off < n) { + int r = in.read(buf, off, n - off); + if (r == -1) { + throw new IOException("closed"); + } + off += r; + } + return buf; + } + + private static void closeQuietly(Socket socket) { + try { + socket.close(); + } catch (IOException ignored) { + // ignore + } + } + + @Override + public void close() { + try { + serverSocket.close(); + } catch (IOException ignored) { + // ignore + } + } + } +}