首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我如何衡量进度,同时通过pg查询流流的postgres数据库查询?

我如何衡量进度,同时通过pg查询流流的postgres数据库查询?
EN

Stack Overflow用户
提问于 2022-02-09 11:43:45
回答 1查看 459关注 0票数 1

我有一个ETL查询,它要求我读取大量行,然后对它们应用一些转换,并将它们保存回Postgres中的一个单独的表中。我正在使用pg-query,并且我计划在一个test作业中运行下面的测试函数。

如何度量下面给定流的进度(当前行处理/行总数)?

代码语言:javascript
复制
const { Pool } = require('pg');
const JSONStream = require('JSONStream');
const QueryStream = require('pg-query-stream');

function test() {
  const pool = new Pool({
    database: process.env.POSTGRES_DB,
    host: process.env.POSTGRES_HOST,
    port: +process.env.POSTGRES_PORT,
    password: process.env.POSTGRES_PASSWORD,
    ssl: process.env.POSTGRES_SSL === 'true',
    user: process.env.POSTGRES_USER,
  });
  const query = `
    SELECT 
        feed_item_id, 
        title, 
        summary, 
        content 
    FROM 
        feed_items 
    ORDER BY 
        pubdate DESC, 
        feed_item_id
    LIMIT 50
    `;

  pool.connect((err, client, done) => {
    if (err) throw err;
    const queryStream = new QueryStream(query, [], {
      batchSize: 200,
    });
    const stream = client.query(queryStream);
    console.log(stream);
    stream.pipe(JSONStream.stringify());
    stream.on('error', (error) => {
      console.error(error);
      done();
    });
    stream.on('end', () => {
      console.log('stream has ended');
      done();
    });
    stream.on('data', async (row) => {
      stream.pause();
      console.log('data received', row.feed_item_id);
      //   const progress = index / ???
      //   Simulate async task
      await timeout(10);
      stream.resume();
    });
  });
}

test();
EN

回答 1

Stack Overflow用户

发布于 2022-04-14 14:56:56

要做到这一点,您需要首先获得表中行的计数。

你可以以described here的身份来做。

有了这个计数,您就可以将QueryStream输送到另一个流中,并在那里测量进度:

代码语言:javascript
复制
const Stream = require('stream')

class ProgressStream extends Stream.Writable{
  constructor(total) {
    super();
    this.i = 0;
    this.total = total;
  }

  _write(chunk, enc, done) {
    this.i++;
    if (this.i % 100 === 1) {
      console.log(`Processed ${this.i} out of ${this.total} of rows`);
    }
    done();
  }
}

const progressStream = new ProgressStream(count);
progressStream.on('finish', () => {
  console.log(`All ${progressStream.i} rows processed!`)
});

stream.pipe(progressStream);
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71048993

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档