2

I have an ETL query which requires me to read a large number of rows and then apply some transformation on them and save them back to a separate table in Postgres. I am using pg-query-stream and I plan to run the test function below inside a bullmq job.

How do I measure progress (current rows processed / total number of rows) for the given stream below?

const { Pool } = require('pg');
const JSONStream = require('JSONStream');
const QueryStream = require('pg-query-stream');

function test() {
  const pool = new Pool({
    database: process.env.POSTGRES_DB,
    host: process.env.POSTGRES_HOST,
    port: +process.env.POSTGRES_PORT,
    password: process.env.POSTGRES_PASSWORD,
    ssl: process.env.POSTGRES_SSL === 'true',
    user: process.env.POSTGRES_USER,
  });
  const query = `
    SELECT 
        feed_item_id, 
        title, 
        summary, 
        content 
    FROM 
        feed_items 
    ORDER BY 
        pubdate DESC, 
        feed_item_id
    LIMIT 50
    `;

  pool.connect((err, client, done) => {
    if (err) throw err;
    const queryStream = new QueryStream(query, [], {
      batchSize: 200,
    });
    const stream = client.query(queryStream);
    console.log(stream);
    stream.pipe(JSONStream.stringify());
    stream.on('error', (error) => {
      console.error(error);
      done();
    });
    stream.on('end', () => {
      console.log('stream has ended');
      done();
    });
    stream.on('data', async (row) => {
      stream.pause();
      console.log('data received', row.feed_item_id);
      //   const progress = index / ???
      //   Simulate async task
      await timeout(10);
      stream.resume();
    });
  });
}

test();
2
  • 1
    Should be very easy via pg-iterator, because you can control every next-row read. Commented Nov 23, 2022 at 10:30
  • @vitaly-t i l definitely take a look into this one, thank you for the headsup Commented Nov 23, 2022 at 12:30

1 Answer 1

1

To do that you'll need to first get count of the rows in the table.
You can do it as described here.

And with that count you can pipe QueryStream into another stream and measure progress there:

const Stream = require('stream')

class ProgressStream extends Stream.Writable{
  constructor(total) {
    super();
    this.i = 0;
    this.total = total;
  }

  _write(chunk, enc, done) {
    this.i++;
    if (this.i % 100 === 1) {
      console.log(`Processed ${this.i} out of ${this.total} of rows`);
    }
    done();
  }
}

const progressStream = new ProgressStream(count);
progressStream.on('finish', () => {
  console.log(`All ${progressStream.i} rows processed!`)
});

stream.pipe(progressStream);
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.