Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

checkForMissedBlocks is scheduled using nested setTimeout #16

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions build/checkForLatestBlock.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ const checkForLatestBlock = async () => {
// Nothing to see, move along, solider
return;
}
if (variables_1.lastHeight !== 0 && currentHeight - variables_1.lastHeight !== 1) {
await (0, utils_1.checkForMissedBlocks)();
}
// Logic for new block that came in
(0, variables_1.updateLastHeight)(currentHeight);
console.log('New block height:', currentHeight);
Expand Down
54 changes: 13 additions & 41 deletions build/entities/tasks/tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,17 @@ const saveTaskDetails = async () => {
await Promise.all(promises);
}
catch (e) {
console.error('fuck me running6', e);
console.error('Error querying/inserting tasks', e);
}
};
exports.saveTaskDetails = saveTaskDetails;
const saveTasks = async (contractAddress, getTasksMsg, blockHeight, contractBlockIdFk) => {
console.log('aloha10');
const tasks = await (0, utils_1.queryContractAtHeight)(contractAddress, getTasksMsg, blockHeight);
let promises = [];
for (const task of tasks) {
promises.push(saveTask(task, contractBlockIdFk));
}
console.log('aloha6');
try {
await Promise.all(promises);
}
catch (e) {
console.error('fuck me running7', e);
}
await Promise.all(promises);
};
const saveTask = async (task, contractBlockIdFk) => {
let intervalType;
Expand Down Expand Up @@ -85,14 +78,7 @@ const saveTask = async (task, contractBlockIdFk) => {
console.warn('Unexpected boundary variant for task', task);
}
}
console.log('aloha8');
let taskRes;
try {
taskRes = await (0, variables_1.db)('js_tasks').insert(taskToInsert, 'id');
}
catch (e) {
console.error('aloha taskRes error', e, taskToInsert);
}
const taskRes = await (0, variables_1.db)('js_tasks').insert(taskToInsert, 'id');
const taskFkId = taskRes[0].id;
// console.log('taskFkId', taskRes)
let promises = [];
Expand Down Expand Up @@ -134,29 +120,15 @@ const saveTask = async (task, contractBlockIdFk) => {
}));
}
// Task rules (actions)
for (const rule of task.rules) {
const ruleVariant = Object.keys(rule.msg)[0];
promises.push((0, variables_1.db)('js_task_rules').insert({
fk_task_id: taskFkId,
rule_variant: ruleVariant,
data: task[ruleVariant]
}));
}
// Task funds withdrawn (funds_withdrawn_recurring)
// NOTE: at the time of this writing, it seems we're only tracking native tokens
for (const fundsWithdrawnNative of task.funds_withdrawn_recurring) {
promises.push((0, variables_1.db)('js_task_deposits').insert({
fk_task_id: taskFkId,
type: 'native',
denom: fundsWithdrawnNative.denom,
amount: fundsWithdrawnNative.amount
}));
}
console.log('aloha7');
try {
await Promise.all(promises);
}
catch (e) {
console.error('fuck me running8', e);
if (task.rules && Array.isArray(task.rules)) {
for (const rule of task.rules) {
const ruleVariant = Object.keys(rule.msg)[0];
promises.push((0, variables_1.db)('js_task_rules').insert({
fk_task_id: taskFkId,
rule_variant: ruleVariant,
data: task[ruleVariant]
}));
}
}
await Promise.all(promises);
};
2 changes: 1 addition & 1 deletion build/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const setup = async () => {
// Fill out extra transaction detail (gas used vs wanted, etc.)
setInterval(() => (0, addTxDetail_1.addTxDetail)(), variables_1.TIMEOUT * 2);
// Check for gaps in blocks
setInterval(() => (0, utils_1.checkForMissedBlocks)(), variables_1.TIMEOUT * 2);
(0, variables_1.updateBlocksTimerId)(setTimeout(utils_1.checkForMissedBlocks, variables_1.TIMEOUT * 2));
// Check for fk_contract_id in messages
setInterval(() => (0, addContractId_1.addContractId)(), variables_1.TIMEOUT * 2);
// Check for synced blocks
Expand Down
42 changes: 11 additions & 31 deletions build/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ exports.v = v;
// Half-finished work here
// We're basically trying to keep track of all heights we're aware of
const addSeenHeight = async (height) => {
console.log('aloha11');
if (variables_1.blockHeights.length === 0) {
variables_1.blockHeights.push(height);
}
Expand Down Expand Up @@ -103,12 +102,12 @@ const addSeenHeight = async (height) => {
};
exports.addSeenHeight = addSeenHeight;
const checkForMissedBlocks = async () => {
console.log('aloha12');
let keepGoing = false;
// We use length - 1 since we can't compare past that
for (let i = 0; i < variables_1.blockHeights.length - 1; i++) {
// Go until we see the first gap, keep it simple
if (variables_1.blockHeights[i] - variables_1.blockHeights[i + 1] !== 1) {
// blockHeights[i] can't be less or equal to blockHeights[i + 1]
if (variables_1.blockHeights[i] - variables_1.blockHeights[i + 1] > 1) {
keepGoing = true;
// Do stuff to add block
const missingBlockNum = variables_1.blockHeights[i] - 1;
Expand All @@ -121,11 +120,11 @@ const checkForMissedBlocks = async () => {
await (0, checkForLatestBlock_1.handleBlockTxs)(missingBlockNum, blockTxs, isoBlockTime);
}
}
(0, variables_1.updateBlocksTimerId)(setTimeout(exports.checkForMissedBlocks, variables_1.TIMEOUT));
};
exports.checkForMissedBlocks = checkForMissedBlocks;
// Credit to Fisher-Yates, SO and eventually https://www.webmound.com/shuffle-javascript-array
const shuffleRPCs = (rpcs) => {
console.log('aloha13');
rpcs.reverse().forEach((item, index) => {
const j = Math.floor(Math.random() * (index + 1));
[rpcs[index], rpcs[j]] = [rpcs[j], rpcs[index]];
Expand All @@ -134,17 +133,15 @@ const shuffleRPCs = (rpcs) => {
};
exports.shuffleRPCs = shuffleRPCs;
const setRPCClients = async (chains) => {
console.log('aloha14');
let newRPCs = [];
for (let i = 0; i < chains.length; i++) {
const address = chains[i].address;
try {
// const httpBatchClient = new HttpBatchClient(address, {
// batchSizeLimit: 5,
// dispatchInterval: TIMEOUT
// })
// const client: any = await Tendermint34Client.create(httpBatchClient)
const client = await tendermint_rpc_1.Tendermint34Client.connect(address);
const httpBatchClient = new tendermint_rpc_1.HttpBatchClient(address, {
batchSizeLimit: 5,
dispatchInterval: variables_1.TIMEOUT
});
const client = await tendermint_rpc_1.Tendermint34Client.create(httpBatchClient);
const queryClient = stargate_1.QueryClient.withExtensions(client, cosmwasm_stargate_1.setupWasmExtension);
let rpcConnection = {
client,
Expand Down Expand Up @@ -193,7 +190,6 @@ const bigIntMe = (theNotBigIntYet) => {
exports.bigIntMe = bigIntMe;
// Quite a useful function to send a query message to a contract at a given block height
const queryContractAtHeight = async (address, args, height) => {
console.log('aloha15');
// Turn JSON object into a string, then into buffer of bytes
const queryReadableBytes = Buffer.from(JSON.stringify(args));
const queryBase64 = (0, exports.base64FromBytes)(queryReadableBytes);
Expand Down Expand Up @@ -235,49 +231,33 @@ const addRPCs = (rpcs) => {
};
exports.addRPCs = addRPCs;
const getLatestBlockHeight = async () => {
console.log('aloha20');
// This uses the "regular" client, not the QueryClient
const clientStatuses = variables_1.allRPCConnections.map(conn => conn.client.status());
const firstBlockHeight = (await Promise.any(clientStatuses)).syncInfo.latestBlockHeight;
return firstBlockHeight;
};
exports.getLatestBlockHeight = getLatestBlockHeight;
const getBlockInfo = async (height) => {
console.log('aloha21a for height', height);
// This uses the "regular" client, not the QueryClient
// const clientBlocks = allRPCConnections.map(conn => conn.client.block(height))
let cmon = null;
try {
cmon = await variables_1.allRPCConnections[0].client.block(height);
}
catch (e) {
console.error('fuck', e);
}
console.log('aloha21b');
return cmon;
// const blockDetails = await Promise.any(clientBlocks).catch(e => {
// console.log('wtf', e)
// })
// return blockDetails
const clientBlocks = variables_1.allRPCConnections.map(conn => conn.client.block(height));
const blockDetails = await Promise.any(clientBlocks);
return blockDetails;
};
exports.getBlockInfo = getBlockInfo;
const getTxInfo = async (hash) => {
console.log('aloha22');
const txHash = Buffer.from((0, encoding_1.fromHex)(hash));
const clientTxs = variables_1.allRPCConnections.map(conn => conn.client.tx({ hash: txHash }));
const txDetails = await Promise.any(clientTxs);
return txDetails;
};
exports.getTxInfo = getTxInfo;
const queryUnverified = async (path, requestObj, height) => {
console.log('aloha17');
const queryClientUnverifieds = variables_1.allRPCConnections.map(conn => conn.queryClient.queryUnverified(path, requestObj, height));
const queryRespEncoded = await Promise.any(queryClientUnverifieds);
return queryRespEncoded;
};
exports.queryUnverified = queryUnverified;
const getContractInfo = async (address) => {
console.log('aloha16');
const queryClientContractInfos = variables_1.allRPCConnections.map(conn => conn.queryClient.wasm.getContractInfo(address));
const queryContractInfo = await Promise.any(queryClientContractInfos);
return queryContractInfo;
Expand Down
6 changes: 5 additions & 1 deletion build/variables.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.db = exports.updateStateTimerId = exports.getStateTimerId = exports.updateLastHeight = exports.lastHeight = exports.updateBlockHeights = exports.blockHeights = exports.contractAddresses = exports.settings = exports.ADD_RPC_ADDRESSES_ALWAYS = exports.ADD_RPC_ADDRESSES = exports.SKIP_RPC_ADDRESSES = exports.VERBOSITY = exports.CHAIN_REGISTRY_URLS = exports.CACHE_LIMIT = exports.RPC_LIMIT = exports.TIMEOUT_CHECK_CHAIN_REGISTRY = exports.TIMEOUT = exports.setAllRPCConnections = exports.allRPCConnections = exports.agents = exports.emptyHeights = exports.blockMap = exports.CHAIN_ID_PREFIX = exports.CHAIN_ID = void 0;
exports.db = exports.updateBlocksTimerId = exports.getBlocksTimerId = exports.updateStateTimerId = exports.getStateTimerId = exports.updateLastHeight = exports.lastHeight = exports.updateBlockHeights = exports.blockHeights = exports.contractAddresses = exports.settings = exports.ADD_RPC_ADDRESSES_ALWAYS = exports.ADD_RPC_ADDRESSES = exports.SKIP_RPC_ADDRESSES = exports.VERBOSITY = exports.CHAIN_REGISTRY_URLS = exports.CACHE_LIMIT = exports.RPC_LIMIT = exports.TIMEOUT_CHECK_CHAIN_REGISTRY = exports.TIMEOUT = exports.setAllRPCConnections = exports.allRPCConnections = exports.agents = exports.emptyHeights = exports.blockMap = exports.CHAIN_ID_PREFIX = exports.CHAIN_ID = void 0;
// Contracts we'll want to look for being called
const db_1 = require("./db");
const dotenv_1 = require("dotenv");
Expand Down Expand Up @@ -47,4 +47,8 @@ const updateStateTimerId = (newTimer) => {
exports.getStateTimerId = newTimer;
};
exports.updateStateTimerId = updateStateTimerId;
const updateBlocksTimerId = (newBlocksTimer) => {
exports.getBlocksTimerId = newBlocksTimer;
};
exports.updateBlocksTimerId = updateBlocksTimerId;
exports.db = (0, db_1.getDb)();
1 change: 1 addition & 0 deletions src/checkForLatestBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ export const handleBlockTxs = async (height: number, blockTxs, isoBlockTime: str
wasmExecTxs.push(simpleTx)
}
})

if (wasmExecTxs.length !== 0) {
console.log('Found transaction(s) interacting with our contract(s) on this block…')
await addSeenHeight(height)
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
settings,
CHAIN_REGISTRY_URLS,
TIMEOUT,
updateStateTimerId, TIMEOUT_CHECK_CHAIN_REGISTRY
updateStateTimerId, TIMEOUT_CHECK_CHAIN_REGISTRY, updateBlocksTimerId
} from "./variables"
import {addRPCs, checkForMissedBlocks, setRPCClients, shuffleRPCs, skipRPCs} from "./utils"
import fetch from 'node-fetch'
Expand Down Expand Up @@ -47,7 +47,7 @@ const setup = async () => {
setInterval(() => addTxDetail(), TIMEOUT * 2)

// Check for gaps in blocks
setInterval(() => checkForMissedBlocks(), TIMEOUT * 2)
updateBlocksTimerId(setTimeout(checkForMissedBlocks, TIMEOUT * 2));

// Check for fk_contract_id in messages
setInterval(() => addContractId(), TIMEOUT * 2)
Expand Down
6 changes: 4 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
blockHeights,
RPC_LIMIT,
setAllRPCConnections, SKIP_RPC_ADDRESSES, TIMEOUT,
updateBlockHeights, VERBOSITY
updateBlockHeights, updateBlocksTimerId, VERBOSITY
} from "./variables";
import {BlockResponse, HttpBatchClient, Tendermint34Client, TxResponse} from "@cosmjs/tendermint-rpc";
import {QueryClient} from "@cosmjs/stargate";
Expand Down Expand Up @@ -96,7 +96,8 @@ export const checkForMissedBlocks = async () => {
// We use length - 1 since we can't compare past that
for (let i = 0; i < blockHeights.length - 1; i++) {
// Go until we see the first gap, keep it simple
if (blockHeights[i] - blockHeights[i + 1] !== 1) {
// blockHeights[i] can't be less or equal to blockHeights[i + 1]
if (blockHeights[i] - blockHeights[i + 1] > 1) {
keepGoing = true
// Do stuff to add block
const missingBlockNum = blockHeights[i] - 1
Expand All @@ -110,6 +111,7 @@ export const checkForMissedBlocks = async () => {
await handleBlockTxs(missingBlockNum, blockTxs, isoBlockTime)
}
}
updateBlocksTimerId(setTimeout(checkForMissedBlocks, TIMEOUT));
}

// Credit to Fisher-Yates, SO and eventually https://www.webmound.com/shuffle-javascript-array
Expand Down
5 changes: 5 additions & 0 deletions src/variables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@ export const updateStateTimerId = (newTimer) => {
getStateTimerId = newTimer
}

export let getBlocksTimerId
export const updateBlocksTimerId = (newBlocksTimer) => {
getBlocksTimerId = newBlocksTimer
}

export const db = getDb()