我有一个ETL查询,它要求我读取大量行,然后对它们应用一些转换,并将它们保存回Postgres中的一个单独的表中。我正在使用pg-query,并且我计划在一个test作业中运行下面的测试函数。
如何度量下面给定流的进度(当前行处理/行总数)?
const { Pool } = require('pg');
const JSONStream = require('JSONStream');
const QueryStream = require('pg-query-stream');
function test() {
const pool = new Pool({
database: process.env.POSTGRES_DB,
host: process.env.POSTGRES_HOST,
port: +process.env.POSTGRES_PORT,
password: process.env.POSTGRES_PASSWORD,
ssl: process.env.POSTGRES_SSL === 'true',
user: process.env.POSTGRES_USER,
});
const query = `
SELECT
feed_item_id,
title,
summary,
content
FROM
feed_items
ORDER BY
pubdate DESC,
feed_item_id
LIMIT 50
`;
pool.connect((err, client, done) => {
if (err) throw err;
const queryStream = new QueryStream(query, [], {
batchSize: 200,
});
const stream = client.query(queryStream);
console.log(stream);
stream.pipe(JSONStream.stringify());
stream.on('error', (error) => {
console.error(error);
done();
});
stream.on('end', () => {
console.log('stream has ended');
done();
});
stream.on('data', async (row) => {
stream.pause();
console.log('data received', row.feed_item_id);
// const progress = index / ???
// Simulate async task
await timeout(10);
stream.resume();
});
});
}
test();发布于 2022-04-14 14:56:56
要做到这一点,您需要首先获得表中行的计数。
你可以以described here的身份来做。
有了这个计数,您就可以将QueryStream输送到另一个流中,并在那里测量进度:
const Stream = require('stream')
class ProgressStream extends Stream.Writable{
constructor(total) {
super();
this.i = 0;
this.total = total;
}
_write(chunk, enc, done) {
this.i++;
if (this.i % 100 === 1) {
console.log(`Processed ${this.i} out of ${this.total} of rows`);
}
done();
}
}
const progressStream = new ProgressStream(count);
progressStream.on('finish', () => {
console.log(`All ${progressStream.i} rows processed!`)
});
stream.pipe(progressStream);https://stackoverflow.com/questions/71048993
复制相似问题