Caching Ethereum events with MySQL
In this article, I am going to demonstrate a simple approach to caching Ethereum events. Here I won’t describe what events are as there are a lot of articles covering that topic (here is the perfect one). I’ll just say that typically we use events for some off-chain operations, for examples tracking token’s transfers or retrieving the filtered list of particular transactions, just like a good old SQL query.
Let’s suppose we want to make a website that tracks some token transfers, a kind of Etherscan. We definitely need such simple operations like:
- get all the token transfers
- get transfers made from particular Ethereum address
- get transfers made to particular Ethereum address
- get transfers that are above or below a particular amount
- get transfers within particular time frames
What we have in web3 now is getPastEvents method, the example usage of which is
let events = await contract.getPastEvents(
"Transfer",
{
filter: {from:'0x0123456789abcdef0123456789abcdef01234567'},
fromBlock: 0,
toBlock: 'latest'
}
);
The main issue with this approach is it can be slow as blockchain grows, especially if you don’t run your own Ethereum node and use public providers like Infura or MyEtherApi.
The next thing — it is almost impossible to implement some tricky queries as filter object’s functionality is quite limited.
Besides, events already written to the blockchain can’t be changed, only new records can be added with time. This and other facts make events a perfect target for caching.
Database choice
In this example, we’ll use MySQL as a database for holding our event records. MySQL has capabilities to store raw JSON and then compose queries using that JSON object’s properties as if they were usual SQL columns.
What should we store?
Let’s take a closer look at the result of getPastEvents method to realize what data we work with. I took some Binance coin transfers as an example. Each event object has the following structure:
{
"address": "0xB8c77482e45F1F44dE1745F52C74426C631bDD52",
"blockHash": "0x19e0d4c4cce0ed7c429b627fc6c5cc5c223c2e9218e233ab2b72e64e817cfcc2",
"blockNumber": 6813922,
"logIndex": 111,
"removed": false,
"transactionHash": "0x32d660785112b084135e0d4d2b53c0d67e851b735eacb486e44e52b7945b857d",
"transactionIndex": 84,
"id": "log_5ea90f71",
"returnValues": {
"0": "0x6ACe7E0abCF0dA3097Fa7155149dccd51E20EF82",
"1": "0xAc951701644936aA95C80ED9f358Fa28f8369075",
"2": "1000553200000000000",
"from": "0x6ACe7E0abCF0dA3097Fa7155149dccd51E20EF82",
"to": "0xAc951701644936aA95C80ED9f358Fa28f8369075",
"value": "1000553200000000000"
},
"event": "Transfer",
"signature": "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"raw": {
"data": "0x0000000000000000000000000000000000000000000000000de2add590e16000",
"topics": [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x0000000000000000000000006ace7e0abcf0da3097fa7155149dccd51e20ef82",
"0x000000000000000000000000ac951701644936aa95c80ed9f358fa28f8369075
]
}
}
As you can see, the event arguments are stored in returnValues property. blockNumber , transactionHash, logIndex might be useful too as I’ll show you later.
Our goal is to write those JSON objects to the database and to implement easy access methods that can replace standard web3’s getPastEvents method seamlessly.
Here is the SQL script for creating the Transfer table.
CREATE TABLE `eth_cache`.`transfer` (
`id` INT NOT NULL AUTO_INCREMENT,
`json` JSON NOT NULL,
`from` VARCHAR(45) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.returnValues.from'))) VIRTUAL,
`to` VARCHAR(45) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.returnValues.to'))) VIRTUAL,
`value` VARCHAR(45) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.returnValues.value'))) VIRTUAL,
`txHash` VARCHAR(66) GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.transactionHash'))) VIRTUAL,
`logIndex` INT GENERATED ALWAYS AS (json_unquote(json_extract(`json`,'$.logIndex'))),
PRIMARY KEY (`id`),
UNIQUE INDEX `IDX_UNIQUE` (`txHash` ASC, `logIndex` ASC));
Some important things to explain:
- json column is created as JSON type. This allows us to create auto-generated columns using special syntax
- from, to, value — these are auto-generated columns. The expression could seem complex at first, but it is really simple in fact. For example, from column value equals to returnValues.from property of the object stored in json column.
- txHash and logIndex. Combined together these properties identify every event object. We need those to make the unique index for a row, thus preventing occasional duplication of events.
Optionally we could also add database index for increasing the performance. For example, for the to column
ALTER TABLE `eth_cache`.`transfer`
ADD INDEX `IDX_TO` (`to` ASC);
Implementation
Prerequisites
- Node.js. I use version 8.4.0.
- Web3 npm package to interact with the blockchain. We need specific version 1.0.0-beta.35. The usage of latest version beta.36 resulted in the ‘Returned values aren’t valid, did it run Out of Gas’ error when trying to retrieve some events.
npm install web3@1.0.0-beta.35 --save
3. To work with MySQL database in JavaScript we should install mysql package
npm install mysql --save
4. And the last — MySQL server. It is worth mentioning that we’ll use MySQL 5.7 as the latest 8.0 version doesn’t seem to be compatible with the mysql package (it gave me strange error ER_NOT_SUPPORTED_AUTH_MODE while trying to connect).
Interacting with MySQL
We’ll utilize connection pool to make queries for this example.
const mysql = require('mysql');
let pool = mysql.createPool({
connectionLimit: <connection limit>,
host: <database server address>,
user: <user>,
password: <password>,
database: <name of your database schema>
});
It would be more convenient to use a promisified version of query method
const util = require('util');
pool.query = util.promisify(pool.query);
Now we can use the following code to insert a record into the transfer table created before.
async function writeEvent(event) {
try {
await pool.query(
`Insert into \`transfer\` (\`json\`) VALUES (\'${JSON.stringify(event)}\')`
);
} catch(e) {
//if it's 'duplicate record' error, do nothing;
// otherwise rethrow
if(e.code != 'ER_DUP_ENTRY') {
throw e;
}
}
}
Here we also check for possible duplicate rows insertion. Now we don’t want to do anything special in that case, probably we’ve already written those duplicate events earlier or something like this. So we just consider this kind of exceptions handled.
Base caching function.
Let’s construct a contract object to retrieve events from.
let contract = new web3.eth.Contract(abi, <contractAddress>);
We can include only Transfer event interface in the abi parameter, like this:
let abi = [{
"anonymous": false,
"inputs": [
{ "indexed": true, "name": "from", "type": "address" },
{ "indexed": true, "name": "to", "type": "address" },
{ "indexed": false, "name": "value", "type": "uint256" }
],
"name": "Transfer",
"type": "event"
}];
This is the base version of the caching function. First, we get event objects, then write them to the database, one by one.
async function cacheEvents(fromBlock, toBlock) {
let events = await contract.getPastEvents(
"Transfer",
{ filter: {}, fromBlock: fromBlock, toBlock: toBlock }
);
for(let event in events) {
await writeEvent(event);
}
}
Regular blockchain scanning
Now let’s expand this to a simple background script that constantly scans blockchain for the events emitted.
Some utility functions
const timeout = 30;
function sleep(milliseconds) {
return new Promise(resolve =>
setTimeout(resolve, milliseconds)
);
}
async function poll (fn) {
await fn();
await sleep(timeout*1000);
await poll(fn);
}
The first one is simple async/await implementation of setTimeout. The second one serves for infinite periodic calls of fn — the worker function.
With these helper functions, our background scanner looks quite simple
async function scan() {
const MaxBlockRange = 500000;
let latestCachedBlock = 0; // latest block written to database
let latestEthBlock = 0; // latest block in blockchain
await poll(async () => {
try {
//get latest block written to the blockchain
latestEthBlock = await web3.eth.getBlockNumber();
//divide huge block ranges to smaller chunks,
// of say 500000 blocks max
latestEthBlock = Math.min(
latestEthBlock,
latestCachedBlock + MaxBlockRange
);
//if it is greater than cached block, search for events
if(latestEthBlock > latestCachedBlock) {
await cacheEvents(latestCachedBlock, latestEthBlock);
//if everything is OK, update cached block value
latestCachedBlock = latestEthBlock + 1;
}
} catch (e) {
//we might want to add some simple logging here
console.log(e.toString());
}
});
}
Let me explain that ‘latestEthBlock + 1’ thing. Web3’s getPastEvents(fromBlock, toBlock) returns events written within that [from, to] range, including the borders. So without this incrementing the next cacheEvents call will again return the events written into latestEthBlock as a part of the result.
Though duplicate events won’t be inserted into database due to unique index implemented, we still don’t want this excess work to be done.
This implementation should be pretty much enough for a simple background scanner. However, there is always room for improvement. We’ll return to it a bit later. Now let’s take a quick look at what we can do now with that data.
Retrieving the events
Here is an example of the function to select transfers made from a particular address
async function selectTransfersFrom(sender) {
return await pool.query(`select json from transfer t where t.from = \'${sender}\'`);
}
We query the database using the generated from column. The most notable part here is that the result of the function looks just like the result of web3’s getPastEvents. It makes refactoring the current code a lot easier.
Further improvements
The event object contains a lot of properties that might be totally useless for your application. It would be better to remove the excess before writing to the database. That way we are saving a lot of space.
async function writeEvent(event) {
try {
delete event.raw;
delete event.event;
delete event.blockHash;
delete event.type;
delete event.id;
delete event.signature;
await pool.query(
`Insert into \`transfer\` (\`json\`) VALUES (\'${JSON.stringify(event)}\')`
);
} catch(e) {
// if it's 'duplicate record' error, do nothing,
// otherwise rethrow
if(e.code != 'ER_DUP_ENTRY') {
throw e;
}
}
}
As you might also have noticed, the current version of the scanner begins with block #0 each time it is restarted. While scanning all the way to current block it will try to insert duplicate records into the database. We can eliminate that excess work by querying the database for the latest cached block.
It would be also nice to start scanning not from the block #0, but at least from the block when the contract was deployed. For simplicity, you might get this information using etherscan.io.
async function getLatestCachedBlock() {
const defaultInitialBlock = <your contract’s deployment block>;
let dbResult = await pool.query(
'select json_unquote(json_extract(`json`,\'$.blockNumber\')) \
as block from transfer order by id desc limit 1'
);
return dbResult.length > 0 ?
parseInt(dbResult[0].block) :
defaultInitialBlock;
}
Here we again use MySQL json functions to get the blockNumber property of the event object.
Then replace the old piece of the scan function
let latestCachedBlock = 0;
with the new one
let latestCachedBlock = await getLatestCachedBlock();
Conclusion
Finally, we’ve created a simple but working event scanner that continuously caches events into MySQL database. If you have any questions please feel free to contact me and I’ll try to answer.
Complete source code is available here https://github.com/olekon/p1_eth_caching.