How I spend my days Mempool watching (Part 1): Transaction Prediction through EVM Tracing

How I started from bird watching to mempool watching, and how I’m building my sandwich bot now

Solid Quant
21 min readJul 31, 2023
You know what you have to do

Today, I’ll be discussing the importance of mempool watching, a powerful technique for understanding people’s intentions through pending transactions in the mempool. Harnessing this capability is invaluable for everyone.

In this blog post, I’ll share a real problem I encountered while building my sandwich bot and demonstrate how I successfully tackled it using EVM tools like EVM tracing.

By employing EVM tracing tools as presented in this blog post, you can proactively learn how a pending transaction will impact the blockchain states once it’s added to a new block.

The techniques learned here will enable you to easily:

I’ll call this task: Transaction Prediction.

Let’s get started right away 🧑‍💻

As an aspiring MEV searcher, I’ve spent a long time bird watching.

Source: Engadget

No, not this bird.

This bird (the bird is no more).

Following people (https://twitter.com/bertcmiller) on Twitter, and learning about the dangers of MEV helped me a lot before I could really get my hands dirty.

As someone building a sandwich bot, you have to get used to watching the following stream of data:

Simple pending transaction stream + EVM tracing (stateDiff)

Let’s load up our favorite IDE, and first, try to understand: what problem we are attempting to solve.

Problem #1

During the development of my sandwich bot, I encountered the need to create an efficient method for determining the storage values (blockchain state) that a pending transaction could impact once mined.

In my specific case, I aimed to monitor all WETH pool reserve changes on Uniswap V2/V3 and then execute either a sandwich or an arbitrage strategy based on that transaction.

However, anyone who has previously built a sandwich bot, whether basic or advanced, knows the cumbersome process of decoding pending transactions and simulating their effects on DEX pool states.

Normally, we decode the “data” field of a transaction using JavaScript, as demonstrated below:

Taken from libevm/subway: subway/src/bot/parse.js

🤷‍♀️ Well, there’s no problem with this, right?

Of course, there’s no problem. However, with the aforementioned method, you can only identify transactions that call the specific function “swapExactETHForTokens.”

Now, quickly go over to Uniswap V2’s periphery contract UniswapV2Router02.sol, you’ll notice a multitude of functions that are relevant to altering ETH-related pool reserves:

  1. Liquidity related:
  • addLiquidityETH
  • removeLiquidityETH
  • removeLiquidityETHWithPermit
  • removeLiquidityETHSupportingFeeOnTransferTokens
  • removeLiquidityETHWithPermitSupportingFeeOnTransferTokens

2. Swap related:

  • swapExactETHForTokens
  • swapTokensForExactETH
  • swapExactTokensForETH
  • swapETHForExactTokens
  • swapExactETHForTokensSupportingFeeOnTransferTokens
  • swapExactTokensForETHSupportingFeeOnTransferTokens

Changing the reserve amount of a pool will change the price of the tokens in that pool, so we’d like to simulate any pending transactions that might affect the reserve, but having to make handler cases for each of the functions is already disheartening — it just doesn’t scale up. (Plus, with the advent of Universal Router, it just isn’t enough to decode function calls that come through Router02. This gets worse, when you are considering V3 pools and other protocols that use a different AMM model.)

Q: How do we find out if the transaction of interest will affect WETH reserves of a given pool? Is there a way to figure this out without having to decode any transaction data? Just tell me whether this TX will touch WETH related pools or not!

Here comes EVM tracing to the rescue. 💪

What is EVM Tracing?

In simple terms, EVM tracing is a way to track and understand what happens in the Ethereum Virtual Machine (EVM) when you execute a transaction.

Smart contract calls are transformed into Opcodes, which are then executed on the EVM. EVM tracing allows you to monitor two key aspects:

  1. Gas usage: how much gas your transaction used,
  2. State changes: how your transaction alters the blockchain’s storage or state, whether through value transfers or contract execution.

by tracing your commands of transactions line-by-line.

This can be more easily understood by looking at a sample EVM Tracing code, which I will first show you using Python. Then I’ll use Rust to show you how easily this can be done in other languages.

To ensure the sample code works as expected for all readers, we’ll use Alchemy as the HTTPProvider. It’s crucial to note that Geth style traces require calling “debug_traceTransaction,” while Parity style traces require calling “trace_transaction.” (We’ll cover how to use both.)

You can get more information regarding the two in:

At the current state, there are close to 64% of Ethereum clients using Geth, so the chances are you’re also running Geth on your local machine. But Parity style clients are gaining traction, as more and more people are using Nethermind, Erigon, Besu, Reth, etc.

https://www.ethernodes.org/

EVM Tracing in action using Python

Let’s type some code now.

First, install dependencies required to run this example script:

pip install aiohttp web3 websockets

Let’s look at Geth style tracing first:

import json
import time
import asyncio
import aiohttp
import websockets
from web3 import Web3

# use your Alchemy API Keys
http_url = '<ALCHEMY_HTTPS_URL>'
ws_url = '<ALCHEMY_WSS_URL>'

provider = Web3(Web3.HTTPProvider(http_url))
ws_provider = Web3(Web3.WebsocketProvider(ws_url))


async def geth_style_tracing(tx_hash: str):
async with aiohttp.ClientSession() as session:
req = {
'id': 1,
'method': 'debug_traceTransaction',
'jsonrpc': '2.0',
'params': [
tx_hash,
{'tracer': 'prestateTracer'}
]
}
request = await session.post(http_url, data=json.dumps(req))
res = await request.json()

result = res.get('result') # Error response: {'jsonrpc': '2.0', 'id': 1, 'error': {'code': -32000, 'message': 'transaction not found'}}

if result:
addresses_touched = list(result.keys())
print('Geth style: ', addresses_touched)
print(res)

async def main():
while True:
try:
async with websockets.connect(ws_url) as ws:
subscription = {
'json': '2.0',
'id': 1,
'method': 'eth_subscribe',
'params': ['newPendingTransactions']
}

await ws.send(json.dumps(subscription))
_ = await ws.recv()

while True:
msg = await asyncio.wait_for(ws.recv(), timeout=60 * 10)
response = json.loads(msg)
tx_hash = response['params']['result']

await geth_style_tracing(tx_hash)
except:
time.sleep(2)
print('reconnecting...')


if __name__ == '__main__':
asyncio.run(main())

In the sample code above:

  1. Make a connection to the Websocket endpoint provided by Alchemy,
  2. Subscribe to “newPendingTransactions” to retrieve data from the mempool,
  3. Get the Transaction hash of pending transactions and call “geth_style_tracing”,
  4. I, then, send a request of “debug_traceTransaction” using the “prestateTracer”.

Doing this will either give an error or a response to storage/state differences of the transaction call.

An error will look like:

{'jsonrpc': '2.0', 'id': 1, 'error': {'code': -32000, 'message': 'transaction not found'}}

So, we need to check for whether the field “result” exists in our response, and then we can get access to the below data:

This looks pretty intimidating at first, but it really isn’t. If you look closely, you will see that the “result” field has multiple keys representing Ethereum accounts that include both user accounts (EOA) and contract accounts (CA). These keys are mapped to values showing the balance changes, storage changes after the code is executed.

We’re interested in the keys and the storage value changes from this data. The keys in the storage dictionary are the hex representation of storage slots, so:

0x0000000000000000000000000000000000000000000000000000000000000000 (32-byte string)

would represent the 0th slot.

Not so difficult, right? Now let’s look at how a Parity style trace call can be made. (And in my Rust version, I’ll also be using Parity style trace calls, because unlike Geth style traces, it doesn’t give you the full bytecode of each calls, so it’s easier to deal with.)

Using a Parity style tracing is as follows:

async def parity_style_tracing(tx_hash: str):
async with aiohttp.ClientSession() as session:
req = {
'id': 1,
'method': 'trace_replayTransaction',
'jsonrpc': '2.0',
'params': [tx_hash, ['stateDiff']]
}
request = await session.post(http_url, data=json.dumps(req))
res = await request.json()

result = res.get('result')

if result:
state_diff = result['stateDiff']
addresses_touched = list(state_diff.keys())
print('Parity style: ', addresses_touched)
print(res)

The only difference here is that you set the method to be: “trace_replayTransaction”. And in your params, you have to add the “stateDiff” trace type to track storage changes just like we did using Geth tracing.

We can see that the response we get back is quite lightweight compared to that of Geth tracing. This is because using the Parity style tracing, you can focus on state changes only, not having to worry about other fields like “code”.

Running the two styles together, we can see that the two styles give back the same results. I will update the main function a little to demonstrate this:

async def main():
while True:
try:
async with websockets.connect(ws_url) as ws:
subscription = {
'json': '2.0',
'id': 1,
'method': 'eth_subscribe',
'params': ['newPendingTransactions']
}

await ws.send(json.dumps(subscription))
_ = await ws.recv()

while True:
msg = await asyncio.wait_for(ws.recv(), timeout=60 * 10)
response = json.loads(msg)
tx_hash = response['params']['result']

await geth_style_tracing(tx_hash)
await parity_style_tracing(tx_hash)

print('\n')
except:
time.sleep(2)
print('reconnecting...')

Output:

We can see that the Geth style sometimes outputs more touched addresses, but the intersection of the two styles should always be equal.

How to use EVM Tracing in sandwich bots using Rust

Finally, I’m going to demonstrate how this can be utilized in sandwich bots using Rust.

I know not all readers are familiar with Rust, so I try to stick to Python/Javascript for most of my other examples, but for this one, I wanted to show everyone how Rust can improve their code performance. Also, a lot of the toolings I’ll use are from Paradigm.xyz (https://www.paradigm.xyz/), and their projects are mostly done using Rust.

I’ll try to explain what my code is doing line-by-line, so that readers who don’t know Rust can also follow along and apply it to their bots written in other languages, because all the concepts shared here should map naturally to other ecosystems as well.

Along the way, though, readers will realize that the EVM tooling within the Rust community is awesome, and that it’s worth looking into if they haven’t yet.

But note that this isn’t to urge everyone to use lower level languages, because that’s not necessary at all. I still use Python for strategies that don’t require heavy computations, and get by with just using Numpy.

In this post, we are going to build the system below:

  1. The program, on startup, will load all Uniswap V3 pools by parsing historical event logs from the Uniswap V3 Factory contract.
  2. It, then, will connect to two async streams, one updating new headers (block), and the other retrieving pending transactions from the mempool.
  3. It will call “trace_replayTransaction” with the “stateDiff” option on all the pending trasactions, trying to simulate whether our target token (in our case, WETH) balance has changed from a Uniswap V3 pool.
  4. If the WETH balance changed, we print the balance change on our console. Preferably changes where the WETH balance of a pool has increased, meaning a swap from WETH → token has occurred.

🙏 Before we begin though, I want to mention beforehand that this sample code borrows from three popular Rust based projects:

  • cfmms-rs:
  • rusty-sando:
  • artemis:

These three projects have helped me so much getting started, so I recommend that anyone entering the MEV arena with Rust should give them a look. You can use this post as a starting point.

Project setup

Now, create a new Rust project by typing the command:

cargo new revm_playground

I’ll name this project, revm_playground, because I’ll be dealing with REVM in the next post using the template code we create today.

You can always visit my Github repo while following along:

Create three files in the src directory:

  • lib.rs
  • main.rs
  • trace.rs

And, in our Cargo.toml, copy and paste the below. These are the dependencies we’ll be using within our project (Cargo.toml):

[package]
name = "revm-playground"
version = "0.1.0"
edition = "2021"

[dependencies]
rand = "0.8.5"
anyhow = "1.0.71"
dashmap = "5.4.0"
hex-literal = "0.4"
hex = "0.4.3"
bytes = "1.4.0"
dotenv = "0.15.0"
auto_impl = { version = "1.1", default-features = false }

# ethers
ethers-providers = "2.0"
ethers-core = "2.0"
ethers-contract = { version = "2.0", default-features = false }
ethers = {version = "2.0", features = ["abigen", "ws", "rustls"]}

# async
futures = "0.3.27"
tokio = { version = "1.28", features = ["full"] }
tokio-stream = { version = "0.1", features = ['sync'] }

# logging
log = "0.4.17"
fern = {version = "0.6.2", features = ["colored"]}
chrono = "0.4.23"
colored = "2.0.0"

# github
cfmms = { git = "https://github.com/0xKitsune/cfmms-rs.git" }

main.rs

Fill in the main.rs file as below (main.rs):

use anyhow::{Ok, Result};
use fern::colors::{Color, ColoredLevelConfig};
use log::LevelFilter;

// Just some logger setup to prettify console prints
pub fn setup_logger() -> Result<()> {
let colors = ColoredLevelConfig {
trace: Color::Cyan,
debug: Color::Magenta,
info: Color::Green,
warn: Color::Red,
error: Color::BrightRed,
..ColoredLevelConfig::new()
};

fern::Dispatch::new()
.format(move |out, message, record| {
out.finish(format_args!(
"{}[{}] {}",
chrono::Local::now().format("[%H:%M:%S]"),
colors.color(record.level()),
message
))
})
.chain(std::io::stdout())
.level(log::LevelFilter::Error)
.level_for("revm_playground", LevelFilter::Info)
.apply()?;

Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
dotenv::dotenv().ok();
setup_logger()?;

let weth = String::from("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2");

Ok(())
}

I first create a simple “setup_logger” function to define some settings for my logger to prettify how my logs are printed on the console. (This function isn’t necessary.)

We, then, load our environment variables, which should be defined within our .env file (.env):

WSS_URL=ws://localhost:8546

Change the URL to your Alchemy RPC endpoint. (I’m using Alchemy right now.)

We will use WETH as our target token that we want to track the balance of.

trace.rs

Now, we’ll start building the core logic of our program from within trace.rs.

Let’s first start by retrieving all pools that exist on Uniswap V3. I’ll create a function called, “mempool_watching(trace.rs):

// All the packages I'll be using in this file
use anyhow::{anyhow, Result};
use cfmms::{
checkpoint::sync_pools_from_checkpoint,
dex::{Dex, DexVariant},
pool::Pool,
sync::sync_pairs,
};
use dashmap::DashMap;
use ethers::{
abi,
providers::{Provider, Ws},
types::{Address, BlockNumber, Diff, TraceType, Transaction, H160, H256, U256, U64},
utils::keccak256,
};
use ethers_providers::Middleware;
use log::info;
use std::{path::Path, str::FromStr, sync::Arc};
use tokio::sync::broadcast::{self, Sender};
use tokio::task::JoinSet;
use tokio_stream::StreamExt;

use crate::utils::calculate_next_block_base_fee;

// Create this function first
pub async fn mempool_watching(target_address: String) -> Result<()> {
// Setup: Create the WS provider and wrap it in Arc
let wss_url: String = std::env::var("WSS_URL").unwrap();
let provider = Provider::<Ws>::connect(wss_url).await?;
let provider = Arc::new(provider);
}

Note here that I’ll be using a utilility function called “calculate_next_block_base_fee”, which we’ll create in our utils.rs file (utils.rs):

use ethers::types::U256;
use rand::Rng;

pub fn calculate_next_block_base_fee(
gas_used: U256,
gas_limit: U256,
base_fee_per_gas: U256,
) -> U256 {
let gas_used = gas_used;

let mut target_gas_used = gas_limit / 2;
target_gas_used = if target_gas_used == U256::zero() {
U256::one()
} else {
target_gas_used
};

let new_base_fee = {
if gas_used > target_gas_used {
base_fee_per_gas
+ ((base_fee_per_gas * (gas_used - target_gas_used)) / target_gas_used)
/ U256::from(8u64)
} else {
base_fee_per_gas
- ((base_fee_per_gas * (target_gas_used - gas_used)) / target_gas_used)
/ U256::from(8u64)
}
};

let seed = rand::thread_rng().gen_range(0..9);
new_base_fee + seed
}

To use this function, we have to add utils to our (lib.rs):

pub mod utils;

I’ve taken care of all the imports before I could start, I’m using:

cfmms, dashmap, ethers, log, tokio, etc.

❗️ You’ll need to have all the imports to run this project in the end.

After creating the provider, we’ll use cfmms-rs to retrieve all pools ever created from the Uniswap V3 Factory. Add the following to “mempool_watching” function (trace.rs):

// imports...

pub async fn mempool_watching(target_address: String) -> Result<()> {
// Setup...

// !! add this !! 🔻
// Step #1: Using cfmms-rs to sync all pools created on Uniswap V3
let checkpoint_path = ".cfmms-checkpoint.json";
let checkpoint_exists = Path::new(checkpoint_path).exists();

let pools = DashMap::new();

let dexes_data = [(
// Uniswap v3
"0x1F98431c8aD98523631AE4a59f267346ea31F984",
DexVariant::UniswapV3,
12369621u64,
)];
let dexes: Vec<_> = dexes_data
.into_iter()
.map(|(address, variant, number)| {
Dex::new(H160::from_str(address).unwrap(), variant, number, Some(300))
})
.collect();

let pools_vec = if checkpoint_exists {
let (_, pools_vec) =
sync_pools_from_checkpoint(checkpoint_path, 100000, provider.clone()).await?;
pools_vec
} else {
sync_pairs(dexes.clone(), provider.clone(), Some(checkpoint_path)).await?
};

for pool in pools_vec {
pools.insert(pool.address(), pool);
}

info!("Uniswap V3 pools synced: {}", pools.len());
}

Here, we create a checkpoint .json file to cache the pools we’ve loaded once. In dexes_data, we define the factory address and the creation block number for each factory contract.

If the checkpoint file exists, we continue on from the last cached data, and if it isn’t, we simply start from block number 12,369,621 (which is the block Uniswap V3 Factory was first deployed) and parse PoolCreated events using “sync_pairs” function.

When we are done retrieving all the pool data as shown above, we loop through pools_vec (Vec<Pool>) data and insert it into a DashMap we call, pools, for later use.

🔵 For people that want to dive in deeper:

Update your lib.rs, so that we can use functions defined in trace.rs from our main function (lib.rs):

pub mod trace;

Then, try updating the main function (main.rs):

use anyhow::{Ok, Result};
use fern::colors::{Color, ColoredLevelConfig};
use log::LevelFilter;

// !! add this !! 🔻
use revm_playground::trace::mempool_watching;

// setup_logger function...

#[tokio::main]
async fn main() -> Result<()> {
dotenv::dotenv().ok();
setup_logger()?;

let weth = String::from("0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2");
mempool_watching(weth).await?; // ◀️ !! add this !!

Ok(())
}

Run: cargo run, and you’ll see the below:

New headers stream

In trace.rs, add definitions for NewBlock and Event as below (trace.rs):

// imports...

#[derive(Default, Debug, Clone)]
pub struct NewBlock {
pub number: U64,
pub gas_used: U256,
pub gas_limit: U256,
pub base_fee_per_gas: U256,
pub timestamp: U256,
}

#[derive(Debug, Clone)]
pub enum Event {
NewBlock(NewBlock),
Transaction(Transaction),
}

This will be used within our async streams. This is following the pattern of Artemis, where there are collectors that stream real-time data by sending Event enums to strategies.

However, for this small project, we won’t be using a too complicated structure, instead take care of most of our logic within our “mempool_watching” function. So we’ll just borrow some useful concepts from Artemis.

Now, update our “mempool_watching” function (trace.rs):

pub async fn mempool_watching(target_address: String) -> Result<()> {
// Setup...

// Step #1...

// Step #2: Stream data asynchronously
let (event_sender, _): (Sender<Event>, _) = broadcast::channel(512);

let mut set = JoinSet::new();

// !! add this !! 🔻
// Stream new headers
// : Create a new scope and think of this as a separate function.
// I've put all the logic into mempool_watching so that it's easier
// to follow along
{
let provider = provider.clone();
let event_sender = event_sender.clone();

set.spawn(async move {
let stream = provider.subscribe_blocks().await.unwrap();
let mut stream = stream.filter_map(|block| match block.number {
Some(number) => Some(NewBlock {
number,
gas_used: block.gas_used,
gas_limit: block.gas_limit,
base_fee_per_gas: block.base_fee_per_gas.unwrap_or_default(),
timestamp: block.timestamp,
}),
None => None,
});

while let Some(block) = stream.next().await {
match event_sender.send(Event::NewBlock(block)) {
Ok(_) => {}
Err(_) => {}
}
}
});
}
}

I’ve created a separate scope for Step #2, where I’ve spawned a new Tokio task that asynchronously receives new headers data using JoinSet. We don’t use the raw block data we receive, but map it to a NewBlock struct we’ve just defined.

Upon receiving new data, we wrap this NewBlock struct into an Event enum so that we can send it to our Event handler, which we will define after we’re done taking care of the to async streams.

Mempool stream

Update our “mempool_watching” function again (trace.rs):

pub async fn mempool_watching(target_address: String) -> Result<()> {
// Setup...

// Step #1...

// Step #2: Stream data asynchronously
let (event_sender, _): (Sender<Event>, _) = broadcast::channel(512);

let mut set = JoinSet::new();

// Stream new headers...

// !! add this !! 🔻
// Stream pending transactions
// https://github.com/gakonst/ethers-rs/blob/master/ethers-providers/src/stream/tx_stream.rs
{
let provider = provider.clone();
let event_sender = event_sender.clone();

set.spawn(async move {
let stream = provider.subscribe_pending_txs().await.unwrap();
let mut stream = stream.transactions_unordered(256).fuse();

while let Some(result) = stream.next().await {
match result {
Ok(tx) => match event_sender.send(Event::Transaction(tx)) {
Ok(_) => {}
Err(_) => {}
},
Err(_) => {}
};
}
});
}
}

This looks very similar to what we did when we spawned a Tokio task to start streaming new headers data.

Here, we spawn another Tokio task, but this time, we subscribe to pending transactions, wrap the resulting data into an Event enum just like we did with NewBlock, and using event_sender, send that Event data to our event handler, which we will now define.

Event handler

Now that we’ve created two Tokio tasks that each send an Event enum that contains either the NewBlock data or the Transaction data, we need to create an Event handler processing these events (trace.rs):

pub async fn mempool_watching(target_address: String) -> Result<()> {
// Setup...

// Step #1...

// Step #2: Stream data asynchronously
let (event_sender, _): (Sender<Event>, _) = broadcast::channel(512);

let mut set = JoinSet::new();

// Stream new headers...

// Stream pending transactions...

// !! add this !! 🔻
// Event handler
{
let mut event_receiver = event_sender.subscribe();

set.spawn(async move {
let mut new_block = NewBlock::default();

loop {
match event_receiver.recv().await {
Ok(event) => match event {
Event::NewBlock(block) => {
new_block = block;
info!("{:?}", new_block);
}
Event::Transaction(tx) => {
if new_block.number != U64::zero() {
let next_base_fee = calculate_next_block_base_fee(
new_block.gas_used,
new_block.gas_limit,
new_block.base_fee_per_gas,
);

// max_fee_per_gas has to be greater than next block's base fee
if tx.max_fee_per_gas.unwrap_or_default()
> U256::from(next_base_fee)
{
// 🛑 we haven't defined trace_state_diff yet, we will
// so this will error at this point
match trace_state_diff(
provider.clone(),
&tx,
new_block.number,
&pools,
target_address.clone(),
)
.await
{
Ok(_) => {}
Err(_) => {}
}
}
}
}
},
Err(_) => {}
}
}
});
}

while let Some(res) = set.join_next().await {
info!("{:?}", res);
}

Ok(())
}

Again, I create a separate scope for this event handler.

Unlike the above two async data streams, this time, we create an event_receiver by subscribing to the event_sender.

The newly spawned event handler task will run an infinite loop and handle each event as below:

  • Event::NewBlock: set the variable new_block to be this value, so that we can use this data in our Transaction handler when a new block has been mined. This is necessary, because we need to have gas related information.
  • Event::Transaction: we check that the pending transaction we are looking at has set the max_fee_per_gas greater than next block’s base fee. Any transaction that has a lower value will not get added to the new block, so we simply skip. If the condition is met though, we call trace_state_diff function, which we will define now.

But before we go define our trace function, we run a loop through the JoinSet we’ve created, currently holding three Tokio tasks. We call join_next on all the tasks, and start running our async tasks.

And don’t forget to return a Result<()> value, Ok(()).

Trace function

We are finally at the last piece of our puzzle. We’ll define trace_state_diff function that we called from our event handler (trace.rs):

// imports...

#[derive(Default, Debug, Clone)]
pub struct NewBlock {
pub number: U64,
pub gas_used: U256,
pub gas_limit: U256,
pub base_fee_per_gas: U256,
pub timestamp: U256,
}

#[derive(Debug, Clone)]
pub enum Event {
NewBlock(NewBlock),
Transaction(Transaction),
}

// !! add this !! 🔻
async fn trace_state_diff(
provider: Arc<Provider<Ws>>,
tx: &Transaction,
block_number: U64,
pools: &DashMap<H160, Pool>,
target_address: String,
) -> Result<()> {
// keep empty, we'll fill it up

Ok(())
}


pub async fn mempool_watching(target_address: String) -> Result<()> {
// function definition here...
}

We’ve added the function signature, let’s fill our function (trace.rs):

async fn trace_state_diff(
provider: Arc<Provider<Ws>>,
tx: &Transaction,
block_number: U64,
pools: &DashMap<H160, Pool>,
target_address: String,
) -> Result<()> {
# !! add this !! 🔻
info!(
"Tx #{} received. Checking if it touches: {}",
tx.hash, target_address
);

let target_address: Address = target_address.parse().unwrap();

let state_diff = provider
.trace_call(
tx,
vec![TraceType::StateDiff],
Some(BlockNumber::from(block_number)),
)
.await?
.state_diff
.ok_or(anyhow!("state diff does not exist"))?
.0;

Ok(())
}

First, we log the transaction hash and perform a type casting of target_address from String to Address to use in later operations.

Next, we call trace_call by sending it the (transaction_hash, vector of trace types, an optional block number). The response we get by calling this may have a state_diff field or not. We use ok_or to see if the state_diff field exists. Next we get the value out of state_diff if it exists by getting the 0th value out, as StateDiff is defined as the below tuple struct:

Let’s continue (trace.rs):

async fn trace_state_diff(
provider: Arc<Provider<Ws>>,
tx: &Transaction,
block_number: U64,
pools: &DashMap<H160, Pool>,
target_address: String,
) -> Result<()> {
info!(
"Tx #{} received. Checking if it touches: {}",
tx.hash, target_address
);

let target_address: Address = target_address.parse().unwrap();

let state_diff = provider
.trace_call(
tx,
vec![TraceType::StateDiff],
Some(BlockNumber::from(block_number)),
)
.await?
.state_diff
.ok_or(anyhow!("state diff does not exist"))?
.0;

# !! add this !! 🔻
let touched_pools: Vec<Pool> = state_diff
.keys()
.filter_map(|addr| pools.get(addr).map(|p| (*p.value()).clone()))
.filter(|p| match p {
Pool::UniswapV2(pool) => vec![pool.token_a, pool.token_b].contains(&target_address),
Pool::UniswapV3(pool) => vec![pool.token_a, pool.token_b].contains(&target_address),
})
.collect();

if touched_pools.is_empty() {
return Ok(());
}

let target_storage = &state_diff
.get(&target_address)
.ok_or(anyhow!("no target storage"))?
.storage;

Ok(())
}

This time, we look at all the keys from state_diff (BTreeMap<H160, AccountDiff>), by first looking at whether the address exists in pools — a DashMap data we created in mempool_watching function using cfmms. If it does, we match it to see if the pool data is from UniswapV2 or UniswapV3. The current project only retrieves data from UniswapV3, but using cfmms, we can sync pairs for both variants. We also check whether the filtered pool is a pair that uses WETH as one of its tokens.

Next, we return early if touched_pools doesn’t have any pools in it, meaning that the pending transaction doesn’t touch any WETH related Uniswap pools.

However, if it touched any WETH pools, we get the altered storage value of WETH. This is to see if the WETH balance of that pool has changed. We’ll see how we can do this now.

Update the function once more (trace.rs):

async fn trace_state_diff(
provider: Arc<Provider<Ws>>,
tx: &Transaction,
block_number: U64,
pools: &DashMap<H160, Pool>,
target_address: String,
) -> Result<()> {
info!(
"Tx #{} received. Checking if it touches: {}",
tx.hash, target_address
);

let target_address: Address = target_address.parse().unwrap();

let state_diff = provider
.trace_call(
tx,
vec![TraceType::StateDiff],
Some(BlockNumber::from(block_number)),
)
.await?
.state_diff
.ok_or(anyhow!("state diff does not exist"))?
.0;

let touched_pools: Vec<Pool> = state_diff
.keys()
.filter_map(|addr| pools.get(addr).map(|p| (*p.value()).clone()))
.filter(|p| match p {
Pool::UniswapV2(pool) => vec![pool.token_a, pool.token_b].contains(&target_address),
Pool::UniswapV3(pool) => vec![pool.token_a, pool.token_b].contains(&target_address),
})
.collect();

if touched_pools.is_empty() {
return Ok(());
}

let target_storage = &state_diff
.get(&target_address)
.ok_or(anyhow!("no target storage"))?
.storage;

# !! add this !! 🔻
for pool in &touched_pools {
let slot = H256::from(keccak256(abi::encode(&[
abi::Token::Address(pool.address()),
abi::Token::Uint(U256::from(3)),
])));

if let Some(Diff::Changed(c)) = target_storage.get(&slot) {
let from = U256::from(c.from.to_fixed_bytes());
let to = U256::from(c.to.to_fixed_bytes());

if to > from {
// if to > from, the balance of pool's <target_token> has increased
// thus, the transaction was a call to swap: <target_token> -> token
info!(
"(Tx #{}) Balance change: {} -> {} @ Pool {}",
tx.hash,
from,
to,
pool.address()
);
}
}
}

Ok(())
}

Before we look at this code, quickly look at WETH contract:

We’re intereseted in the balanceOf mapping, which is in slot number 3. You can easily figure out that the storage layout of WETH is as follows:

  • 0: name
  • 1: symbol
  • 2: decimals
  • 3: balanceOf

The way we derive a storage slot of a mapping value is by:

keccak256(key of mapping padded to 32 bytes + 32 bytes value of storage slot which is 3 in this case)

https://steveng.medium.com/ethereum-virtual-machine-storage-layout-beb9a72a07e9

This is done using Rust as:

let slot = H256::from(keccak256(abi::encode(&[
abi::Token::Address(pool.address()),
abi::Token::Uint(U256::from(3)),
])));

Now, recall that Parity style storage states were returned as:

So, using Rust, we get the value of slot and get the values of from and to.

Lastly, if the to value is greater than the from value, we know that the WETH balance of the Uniswap pool increased, meaning a user is trying to swap WETH for another token. We print this to the console.

Conclusion

We’re finally done writing our small project for doing EVM tracing using Rust. We see that running: cargo run works like magic:

What to expect in the next part?

In this post, we looked at how we can use EVM Tracing to monitor storage changes of a pending transaction. This was very convenient for filtering out transactions that we might be interested in for our strategies.

In the next post, we are going to use REVM to take a step further and simulate a function call in our customized EVM setup. This will enable us to figure out:

  • whether the token we are trading is taxed or toxic,
  • whether we are exposed to Salmonella attacks,
  • simulate swaps on any DEX without having to understand the math behind their AMM systems.

revm (https://github.com/bluealloy/revm) is currently used in major projects such as: Foundry, Helios, Hardhat, Reth, Arbiter.

If this sounds interesting to you, give this post a clap, and subscribe to get notified on the next post right away.

I’ll see you in the next one! 😀

--

--