Ethereum Event Indexer

touqeer shah
Coinmonks
12 min readDec 8, 2022

--

There are so many people working on Ethereum for a long time and know that events are a very important part of Smart contracts, Whenever Transaction happened we trigger an event at the end of the function call, which gives us some Important data, which helps us to understand the internal of the transaction, we can use that data for Index future process for analytical or notifications.

Etherscan Output

Indexing

Before starting we have to understand what is indexing, It is a very useful concept in a Database that helps to arrange or sort data on some field which make the query faster in the future on data to make Database usage more effective.

The question is why we need Indexing in Blockchain because if we run a rich query on the blockchain it is time-consuming and costly as well to make the process easy and fast we used logs or event data and Index it so we can query on Off-chain data more effectively.

https://www.javatpoint.com/indexing-in-dbms

SubGraph

SubGraph is one of the popular Platforms which helps Projects to get the Event logs Data, Index data, and provide what to GraphQL to Query that data.

But being a developer we need to understand how these logs work and how Subgrpah understands which field of data needs to be indexed, which Event is called, and how to Decode the data in a valid format.

How we can create our own Event Handler to Index data?

Today I am going to explain how you can understand the logs, decode them, fetch event logs, and index them.

To achieve this you have an understanding of the following things.

  1. Nodejs
  2. PostgresDB
  3. Web3
  4. Contract ABI
  5. Etherscan

Code : https://github.com/touqeerShah/Ethereum-Event-Indexer

Contract ABI:

It is a very important part of the contract and also for this project, it helps us to know how many different functions or events we have, what is the parameter which contract going to pass in the event, and on which we have to apply Indexing.

In the following screenshot, you can see the part of ABI, which tells us that we have an event name Transfer, with three parameters but only two of them have indexed true which means Index applies only on two of the field other than the just store.

In the project, you will find it under the config folder copy your contract ABI in it.

https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/config/abi.js

Understanding Logs:

How do we know which event is called from logs on Etherscan?
If you look closely at the logs you will find one interesting thing the first hash value in logs is actually the hash of the event name and its parameter data type, at zero Index of Topics, you can check it by yourself.
The rest of other are parameters of the event you can pass max three parameters in the index remaining will show in the data Coloume in the logs.

You can test it buy yourself by following link with other event names and parameters but keep one thing in mind take care of spaces otherwise hash will not generate the some.

Transfer(address,address,uint256)
Hash link: https://emn178.github.io/online-tools/keccak_256.html

Generate Signature from ABI

Now we understand how it works it is time to look at the code and how we convert the ABI to a signature.

Code:

var ethers = require("ethers");
var abi = require("../../config/abi"); // first we load ABI
const fs = require("fs");
var abiSigaturePath = "./../config/event_signature.json";
var utils = ethers.utils;

/**
* This function is used to get the ABI and convert them into its Signature
* we used to create table and do transactions
*/
module.exports.setupABISignature = async () => {
var allEvent = abi.contractAbi.filter(function (el) {
// here we get only those object whose type is Event from ABI
return el.type == "event";
});

var eventSingature = {};

// {"singature":{"Name":"","fieldsName":[],"fieldsType":[],"Index":["field1","feild2"]}
console.log("Found ", allEvent.length + " Number Of Event in ABI");
console.log("Start Create Event Signature ....");

allEvent.forEach((event) => {
var tem = {};
var isComma = false;

var functionSignature = event.name + "(";
tem["Name"] = event.name;
tem["fieldsName"] = [];
tem["fieldsType"] = [];
tem["Index"] = [];
event.inputs.forEach((input) => {
// here we create event object to signature-based object
tem["fieldsName"].push(input.name);
tem["fieldsType"].push(input.type);
if (isComma) {
functionSignature += ",";
}
// console.log(isComma, " functionSignature", functionSignature);
input.indexed ? tem["Index"].push(input.name) : "";
functionSignature += input.type;
isComma = true;
});
functionSignature += ")";
console.log("function signature", functionSignature); //
var bytesArrary = utils.toUtf8Bytes(functionSignature);
var signature = utils.keccak256(bytesArrary); // Transfer(address,address,unit256) -> function signature
eventSingature[signature] = tem; // here we put all the values related to the function
});
fs.writeFileSync(abiSigaturePath, JSON.stringify(eventSingature));
// once it is done put them into a file so we can use them
console.log("Finish Create Event Signature .... Path", abiSigaturePath);
};

Final Output:
Hash will to the signature which helps us to match from logs and identify which event is called when we load the event from Blockchain.

{
"0x62e78cea01bee320cd4e420270b5ea74000d11b0c9f74754ebdbfc544b05a258": {
"Name": "Paused",
"fieldsName": ["account"], // list of all field
"fieldsType": ["address"], // there data type
"Index": []
},
"0x5db9ee0a495bf2e6ff9c91a7834c1ba4fdd244a5e8aa4e537bd38aeae4b073aa": {
"Name": "Unpaused",
"fieldsName": ["account"],
"fieldsType": ["address"],
"Index": []
},
"0x6719d08c1888103bea251a4ed56406bd0c3e69723c8a1686e017e7bbe159b6f8": {
"Name": "PauserAdded",
"fieldsName": ["account"],
"fieldsType": ["address"],
"Index": ["account"] // this tell on field we have to apply indexing
},
"0xcd265ebaf09df2871cc7bd4133404a235ba12eff2041bb89d9c714a2621c7c7e": {
"Name": "PauserRemoved",
"fieldsName": ["account"],
"fieldsType": ["address"],
"Index": ["account"]
},
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef": {
"Name": "Transfer",
"fieldsName": ["from", "to", "value"],
"fieldsType": ["address", "address", "uint256"],
"Index": ["from", "to"]
},
"0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925": {
"Name": "Approval",
"fieldsName": ["owner", "spender", "value"],
"fieldsType": ["address", "address", "uint256"],
"Index": ["owner", "spender"]
}
}
//Code which generates Signature of EVent

https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/event/utils/abi_to_signature.js
// final output
https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/config/event_signature.json

Generate Table from Event:

Once ABI is converted into signature and meaningful data it is time to generate the table for the event so we can store data and apply indexing to it.

before creating a table make sure your PostgresDB is running and ENV is set.

Env file:

POSTGRESSDB_ADDRESS="localhost"
POSTGRESSDB_PORT="5432"
POSTGRESSDB_DB="Event"
POSTGRESSDB_PASS="password"
POSTGRESSDB_USER="admin"
PORT=8080
HOST="localhost"
INFURA_APIKEY="Infure Key"
CONTRACT_ADDRESS="your deployed contract Address"
CHAIN_ID="1"
CHAIN_NAME="ETHEREUM_MAINNET"
STRARTFROM=//Start form where fatch block
TRANSFER_SIGNATURE="0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"

Time to create Tables:

The following code is constant values which we update according to the event parameter to make the code dynamic, `basicTableStructure` Almost every function has some default field like transaction hash, block number …

So we create a template for it adds a new field in the template and creates the table.

// for tables
module.exports.basicTableStructure = (table, addtionalFields) => {
return `CREATE TABLE IF NOT EXISTS ${table} (
"transactionHash" VARCHAR(70) NOT NULL,
"contractAddress" VARCHAR(70) NOT NULL,
"blockNumber" Numeric NOT NULL,
${addtionalFields}
PRIMARY KEY ("transactionHash")
);`;
};
// for indexing on table
module.exports.basicIndexStructure = (table, indexField) => {
return `CREATE INDEX ${table}
ON ${indexField};`;
};
// it for storing basic details of event service what was last block which we process
module.exports.eventListed = `
CREATE TABLE IF NOT EXISTS "eventListed" (
"chainId" VARCHAR(5) NOT NULL,
"chainName" VARCHAR(50) NOT NULL,
"contractAddress" VARCHAR(50) NOT NULL,
"startBlockNumber" Numeric NOT NULL,
"totalAmountTransfer" Numeric NOT NULL,
PRIMARY KEY ("chainId")
);`;
module.exports.insertEventListed = insertEventListed = (configObj) => {
return `INSERT INTO public."eventListed" ("chainId", "chainName", "contractAddress", "startBlockNumber","totalAmountTransfer")
VALUES ('${configObj.CHAIN_ID}', '${configObj.CHAIN_NAME}', '${configObj.CONTRACT_ADDRESS}', ${configObj.STRARTFROM},0)`;
};

module.exports.getStartingBlock = getStartingBlock = (contractAddress) => {
return `select el."startBlockNumber" from public."eventListed" el where "contractAddress" like '${contractAddress}'`;
};

module.exports.basicInsertStructure = (table, addtionalFields, values) => {
return `INSERT INTO ${table} (
"transactionHash" ,
"contractAddress",
"blockNumber",
${addtionalFields}
) VALUES (${values})`;
};

module.exports.updateStartBlock = (startBlockNumber, contractAddress) => {
return `UPDATE public."eventListed"
SET "startBlockNumber"=${startBlockNumber}
WHERE "contractAddress" like '${contractAddress}';
`;
};

module.exports.updateTotalAmountTransfer = (
newAmountTransfer,
contractAddress
) => {
return `UPDATE public."eventListed" el
SET "totalAmountTransfer"= "totalAmountTransfer"+${newAmountTransfer}
where "contractAddress" like '${contractAddress}';
`;
};

module.exports.getTotalAmountTransfer = getTotalAmountTransfer = (
contractAddress
) => {
return `select el."totalAmountTransfer" from public."eventListed" el where "contractAddress" like '${contractAddress}'`;
};

module.exports.getSearchTransacationHash = getSearchTransacationHash = (
transactionHash
) => {
return `select * from search_columns('${transactionHash}');`;
};

module.exports.setSearchColumeFunction = setSearchColumeFunction = () => {
return `CREATE OR REPLACE FUNCTION search_columns(
needle text,
haystack_tables name[] default '{}',
haystack_schema name[] default '{}'
)
RETURNS table(schemaname text, tablename text, columnname text, rowctid text)
AS $$
begin
FOR schemaname,tablename,columnname IN
SELECT c.table_schema,c.table_name,c.column_name
FROM information_schema.columns c
JOIN information_schema.tables t ON
(t.table_name=c.table_name AND t.table_schema=c.table_schema)
JOIN information_schema.table_privileges p ON
(t.table_name=p.table_name AND t.table_schema=p.table_schema
AND p.privilege_type='SELECT')
JOIN information_schema.schemata s ON
(s.schema_name=t.table_schema)
WHERE (c.table_name=ANY(haystack_tables) OR haystack_tables='{}')
AND (c.table_schema=ANY(haystack_schema) OR haystack_schema='{}')
AND t.table_type='BASE TABLE'
LOOP
FOR rowctid IN
EXECUTE format('SELECT ctid FROM %I.%I WHERE cast(%I as text)=%L',
schemaname,
tablename,
columnname,
needle
)
LOOP
-- uncomment next line to get some progress report
-- RAISE NOTICE 'hit in %.%', schemaname, tablename;
RETURN NEXT;
END LOOP;
END LOOP;
END;
$$ language plpgsql;`;
};

To consume the code and create a table we have the following code, we import the template and signature which we create earlier and start creating table and indexing on the tables.

var { pool } = require("../module/postgresDB");
var {
basicTableStructure,
basicIndexStructure,
eventListed,
insertEventListed,
setSearchColumeFunction,
} = require("../constants/constant");
var event_signature = require("../config/event_signature.json");// load signature
var { configObj } = require("../config/config");
// to execute the query
const execute = async (query) => {
try {
await pool.query(query); // sends queries
return true;
} catch (error) {
// console.error(error.stack);
return false;
}
};

module.exports.CreateTables = async () => {
try {
await pool.connect(); // gets connection
console.log("Connected to Postgres");
} catch (error) {
console.log(error);
process.exit(1);
}
console.log("Creating Event Table ....");
// this table is for default information about event services like contract,
// last event block number next time start from some point
await execute(eventListed).then(async (result) => {
if (result) {
console.log("Event Table created");
result = await execute(insertEventListed(configObj));
if (result) {
console.log("Basic Info Added Event");
}
}
});
// get event signature
for (const eventTabl in event_signature) {
var tableFields = "";
var indexField = "";
// console.log(`${eventTabl}: ${event_signature[eventTabl]}`);
for (const i in event_signature[eventTabl].fieldsName) {
tableFields += `"${event_signature[eventTabl].fieldsName[i]}"`;
// based on data type in event assign table data type
if (event_signature[eventTabl].fieldsType[i] == "uint256") {
tableFields += " Numeric NOT NULL,";
} else {
tableFields += " VARCHAR(50) NOT NULL,";
}
}
for (const i in event_signature[eventTabl].Index) {
if (i > 0) {
indexField += ",";
}
indexField += `"${event_signature[eventTabl].Index[i]}"`;
}
indexField = event_signature[eventTabl].Name + "(" + indexField + ")";
// pass new field to template and create tables
await execute(
basicTableStructure(event_signature[eventTabl].Name, tableFields)
).then(async (result) => {
if (result) {
console.log(event_signature[eventTabl].Name, " Table created");
if (event_signature[eventTabl].Index.length != 0) {
result = await execute(
basicIndexStructure(
"index_" + event_signature[eventTabl].Name,
indexField
)
);
if (result) {
console.log(
"Indexing Created On" +
event_signature[eventTabl].Name +
" Event"
);
}
}
} else {
console.log(
"Table Already Exists",
event_signature[eventTabl].Name
);
}
});
}
// await pool.end();
console.log("Creating Search colume Function on Table ....");
await execute(setSearchColumeFunction).then(async (result) => {
if (result) {
console.log("Search colume Function created");
}
});
console.log("Done");
process.exit(0);
};
// CreateTables();
// console.log("configObj", process.env);
https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/constants/constant.js
https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/utils/setupDB.js

Get Even from Blockchain:

Now it is time to get event logs from Blockchain and based on the event store data in the Database.

Steps :
1. Frist we load the template.
2. load ABI signature
3. Decode Data some data in the event are in HEX we need to decode them based on the data type to get meaningful output
4. we used web3 subscribe with logs with filter option which contract and starting block number
5. it is based on WebSocket so it will load an event when it found new one and store it into DB and update the starting point in DB

// Setup: npm install alchemy-sdk
const Web3 = require("web3");

var { configObj } = require("../config/config");
var { pool, execute } = require("../module/postgresDB");
var {
getStartingBlock,
basicInsertStructure,
updateStartBlock,
updateTotalAmountTransfer,
} = require("./../constants/constant");
var event_signature = require("../config/event_signature.json");
var { decodeData } = require("./utils/dcoder");

// connect to Infure
const RPC_ENDPOINT = `wss://mainnet.infura.io/ws/v3/${configObj.INFURA_APIKEY}`;
const web3 = new Web3(RPC_ENDPOINT);
// this connect to web3 subscription with infure RPC to get real time log of chain
module.exports.event = async () => {
await pool.connect(); // gets connection
// here get from where to start logs with block number and it will update everytime new block is add
// so when we stop and start again it will start from where we live
var response = await execute(
getStartingBlock(configObj.CONTRACT_ADDRESS),
pool
);
if (!response.status) {
console.log("Unable to fatch Record ");
process.exit(1);
}
// console.log("startBlockNumber", response.result.rows[0].startBlockNumber);
var startBlockNumber = response.result.rows[0].startBlockNumber;
//filter which which address want to monitor and from where
let options = {
fromBlock: startBlockNumber,
address: [configObj.CONTRACT_ADDRESS], //Only get events from specific addresses
topics: [], //What topics to subscribe to
};
var count = 0;
let subscription = web3.eth.subscribe(
"logs",
options,
async (err, event) => {
if (!err) {
console.log(count, event);
count++;
// here we get the event log and send it to the store into DB
await insertEventData(pool, event);

var response = await execute(
updateStartBlock(
event.blockNumber,
configObj.CONTRACT_ADDRESS
),
pool
);
if (response.status) {
console.log("update Starting Block Number ");
}
// process.exit(1);
}
}
);
};

Logic to store data in DB

Steps:
1. first we load basic information from the event object like transaction hash, and block number.
2. As we discuss topics Zero is the event hash Signature so we identify which event occurs and load its signature from which we create the earliest.
3. load the field name and create a query for inserting data into DB.
4. it also checks is any data into the Data column on the event if yes append that one too in query and generate a final query.


/**
* This function will insert data into there respective table
* based on it function signature
* @param {*} pool PostgresDB connection object
* @param {*} eventObject //logs object
*/
async function insertEventData(pool, eventObject) {
var tableFields = "";
// these are common data in all tables
var tabledata =
`'${eventObject.transactionHash}'` +
"," +
`'${eventObject.address}'` +
"," +
`${eventObject.blockNumber}` +
",";
// this is place holder for the constants which tell all the
// what are the fields name of table which we will going to store
for (const i in event_signature[eventObject.topics[0]].fieldsName) {
if (i != 0) tableFields += ",";
tableFields += `"${
event_signature[eventObject.topics[0]].fieldsName[i]
}"`;
}
// get values and decode them with based on there type
for (let index = 1; index < eventObject.topics.length; index++) {
var data = decodeData(
event_signature[eventObject.topics[0]].fieldsType[index - 1],
eventObject.topics[index]
);
if (index != 1) {
tabledata += ",";
}
tabledata += `'${data}'`;
}
// if event have data values
if (eventObject.data != "") {
var len = event_signature[eventObject.topics[0]].fieldsType.length;
var data = decodeData(
event_signature[eventObject.topics[0]].fieldsType[len - 1],
eventObject.data
);
if (
event_signature[eventObject.topics[0]].fieldsType[len - 1] ==
"uint256"
) {
tabledata += `,${data}`;
// if values if number and signature is for transfer the sum it with Total amount transfer in DB
if (eventObject.topics[0] == configObj.TRANSFER_SIGNATURE) {
const etherValue = Web3.utils.fromWei(data.toString(), "ether");
console.log("etherValue", etherValue);
var response = await execute(
updateTotalAmountTransfer(
etherValue,
configObj.CONTRACT_ADDRESS
),
pool
);
if (response.status) {
console.log("update Transfer Amount");
}
}
} else {
tabledata += `,'${data}'`;
}
}
// get name of table based on it signature
var tableName = `public."${event_signature[
eventObject.topics[0]
].Name.toLowerCase()}"`;
// insert data into table
var response = await execute(
basicInsertStructure(tableName, tableFields, tabledata),
pool
);
if (!response.status) {
console.log(
"Unable to Insert Record ",
event_signature[eventObject.topics[0]],
"Transaction hash ",
eventObject.transactionHash
);
// process.exit(1);
}
}
https://github.com/touqeerShah/Ethereum-Event-Indexer/blob/main/event/event_listener.js

Rest API:

We have also some rest endpoints if you want to play DB data after the event you can extend it and add a new API based on your need.

const express = require("express");
const {
getTotalAmount,
verifyHashDB,
verifyHash,
} = require("../controller/blockchainController");
const router = express.Router();
// following are the routes which we used to expose the backend service
router.get("/getTotalAmount", getTotalAmount);
router.get("/verifyHashDB", verifyHashDB);
router.get("/verifyHash", verifyHash);

module.exports = router;
{
"info": {
"_postman_id": "5b6e2a48-d7d3-43a0-9bb5-4738afe5d66e",
"name": "Task",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json",
"_exporter_id": "11496047"
},
"item": [
{
"name": "getTotalAmount",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "http://localhost:8080/api/getTotalAmount",
"protocol": "http",
"host": [
"localhost"
],
"port": "8080",
"path": [
"api",
"getTotalAmount"
]
}
},
"response": []
},
{
"name": "verifyHash",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "http://localhost:8080/api/verifyHash?transactionHash=0x320b95cef4c3cfe7ffca91f8bd9e5734cdba99cafc430004a50b9f553194929a",
"protocol": "http",
"host": [
"localhost"
],
"port": "8080",
"path": [
"api",
"verifyHash"
],
"query": [
{
"key": "transactionHash",
"value": "0x320b95cef4c3cfe7ffca91f8bd9e5734cdba99cafc430004a50b9f553194929a"
}
]
}
},
"response": []
}
]
}

New to trading? Try crypto trading bots or copy trading on best crypto exchanges

Join Coinmonks Telegram Channel and Youtube Channel get daily Crypto News

Also, Read

--

--