4

I am using iojs and node-mysql. This is my first foray into async server-side programming. It is essentially a batch job: run it all once, exit. I am specifically trying to do this on a table full of revisions:

For each document edited in the past year; for each revision to that document made in the past year; get the prior revision and diff its content with the current revision.

So, I am using the results of one query (for each document) to fire off an arbitrary number of additional queries (for each revision) that must themselves recurse (get the prior revision).

I can't figure out how to close the database connection. Near as I can tell, the recursion is the confounding factor: If I remove that from the code, then I can close the db connection. But I need to recurse.

Here is a minimal example (assume requires and config are OK) that accomplishes the behavior I'm seeing in my program.

var con = mysql.createConnection(db_config);
con.connect();

con.query('SELECT field_2 FROM test_table', function(err, rows) {
    if (err) throw err;
    rows.forEach(function(row) {
        second_query(row.field_2);
    });

    // using this here works if there is no recursion, even if second_query runs long
    // using this here does not work if there is recursion
    // removing this allows the program to run to completion, but the event loop never exits
    con.end()
});

function second_query(key) {
    con.query('SELECT * FROM test_table_2 WHERE field_2 > ?', [key], function(err, rows) {
        if (err) throw err;
        if (rows.length > 0) {
            rows.forEach(function(row) {
                console.log(row.field_2);
            });
            second_query(key + 1);
        }
    });
}

I have tried very hard to solve the problem by registering database queries in an accumulator and decrementing the accumulator as each query ends, but that hasn't produced predictable success yet and it makes the code painful to work with.

2 Answers 2

3

I like async.queue for this type of workload. You get trivially-adjustable concurrency for free, but it's always easiest to debug with concurrency 1.

var mysql = require("mysql");

// concurrency 1. Adjust to taste once it's working
var queue = require("async").queue(secondQuery, 1);
var dbOptions = {
  host: process.env.DOCKER_IP,
  database: "hoosteeno",
  user: "root",
  password: "password"
};
var con = mysql.createConnection(dbOptions);
con.connect();

con.query("SELECT field_2 FROM test_table", function (error, rows) {
  if (error) throw error;
  rows.forEach(function (row) {
    queue.push(row.field_2);
  });
});

function secondQuery (key, callback) {
  var sql = "SELECT * FROM test_table_2 WHERE field_2 > ?";
  con.query(sql, [key], function (error, rows) {
    if (error) {
      callback(error);
      return;
    }
    if (rows.length > 0) {
      rows.forEach(function (row) {
        console.log(row.field_2);
      });
      queue.push(key + 1);
    }
    callback();
  });
}

queue.drain = function () {
  con.end();
};
Sign up to request clarification or add additional context in comments.

2 Comments

I like this answer, very clean :). Does the drain function take an err param? Or does queue fire an error event?
No, errors have to be handled appropriately when they occur and you can choose to allow the queue to continue, pause it, terminate it, etc. I don't think there's any baked-in assumptions about error handling. github.com/caolan/async/blob/master/lib/async.js#L731
2

I think your problem is around detecting when all of your asynchronous SQL queries have finished. I have a few ideas here.

Here's one (disclaimer: untested!) approach that doesn't change the structure of your code too much. I use allQueriesRan to track when all of your queries have been issued and I use pendingQueries as a counter to track how many queries we're still waiting on.

var allQueriesRan = false;
var pendingQueries = 0;

function second_query(key) {
    pendingQueries++;
    con.query('SELECT * FROM test_table_2 WHERE field_2 > ?', [key], function(err, rows) {
        pendingQueryies--;
        if (allQueriesRan && pendingQueries === 0) {
            // We've finished our recursion and we've allowed all queries to return
            con.end();
        }

        if (err) throw err;
        if (rows.length > 0) {
            rows.forEach(function(row) {
                console.log(row.field_2);
            });

            second_query(key + 1);
        } else {
            allQueriesRan = true;
        }
    });
}

A promise library might also make your code succinct, if you're willing to descend into that rabbit hole. I like kriskowal/Q. Q for example allows you to have your recursive function return a promise object that can be "resolved" at a later time, that is, once all of your queries have returned. You can chain off of that promise with a .then() call to close your DB connection at the right time. Here's what your code might look like using this approach:

var deferred = Q.defer();

function second_query(key) {
    con.query('SELECT * FROM test_table_2 WHERE field_2 > ?', [key], function(err, rows) {
        if (err) {
            return deferred.reject(err);
        }

        if (rows.length > 0) {
            rows.forEach(function(row) {
                console.log(row.field_2);
            });

            second_query(key + 1);
        } else {
            deferred.resolve();
        }
    });

    return deferred.promise;
}

second_query(yourKey)
    .then(function() {
        console.log('All done!');
    })
    .fail(err, function(err) {
        throw err;
    })
    .finally(function() {
        con.end();
    });

Note that one handy feature of this is if a query ever returns an err, the call to deferred.reject() will short circuit your flow of execution to the .fail() handler at the very bottom.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.