Data processing pipeline

Building an efficient data processing pipeline that remains easy to maintain over time can be quite challenging.

Models

Let’s consider for this article the following data structure which models an accounting transaction:

{
  "balance": -3000,
  "date": "2021-09-01",
  "category": "food",
  "description": "Yummy Restaurant corp"
}

For our system, we want to apply calculation rules such as “Sum of all the negative balances (debit) for the transactions which occurred in 2021 and labeled as ‘food'”. An additional constraint is that the amount of data to process can be substancial.

Declarative vs Imperative

A straightforward way to implement the rule aforementioned with modern Javascript idioms would be something similar to the following snippet:

const isBalanceNegative = (transaction) => transaction.balance < 0;

const isIn2021 = (transaction) => new Date(transaction.date).getFullYear() === 2021;

const isCategoryFood = (transaction) => transaction.category === 'food';

const sum = (a, b) => a + b;

const getBalanceAmount = (transaction) => Math.abs(transaction.balance);

const processingRule = (transactions) => transactions
  .filter(isBalanceNegative)
  .filter(isIn2021)
  .filter(isCategoryFood)
  .map(getBalanceAmount)
  .reduce(sum, 0);

We could have grouped the filter rules together as well

const isTransactionMatching = (transaction) => [isCategoryFood, isIn2021, isBalanceNegative].every((predicate) => predicate(transaction));

You can read it almost as plain English: it is declarative and the different steps are decomposed into various small functions. Therefore, this implementation is easy to maintain and will probably stand the test of time.

However, this implementation may not be efficient to process a large number of transactions as it needs to load all the transactions in memory and semantically recreates a new array between each step of the pipeline (in practice, Javascript engines are able to optimize this kind of code).

Beyond the eventual problem of performance, we can note the transactions get processed in batches rather than one by one, which is not ideal either (imagine the last item of the stream fails at the first stage, no data gets processed at all)

Another approach would be to write a loop, in an imperative way, processing one item at a time:

const processingRule = (transactions) => {
  let total = 0;
  for (const transaction of transactions) {
    if (isCategoryFood(transaction) && isBalanceNegative(transaction) && isIn2021(transaction)) {
      const balanceAmount = getBalanceAmount(transaction);
      total += sum(total, balanceAmount);
    }
  }
  return total;
};

Whereas you might save some memory and processing resources, you also lose flexibility and readability. Could we get the best of both worlds?

Streams

Conceptually, we want to process the transactions one by one as they are emitted over time. We often call this concept stream.

Whether you refer to the DOM streams or the nodejs streams, you can abstract away the implementation details by considering as (readable) stream anything that implements the async iterator protocol and therefore can be consumed with a basic for await loop. This includes simple arrays as well as actual nodejs readable streams for example:

import {createReadStream} from 'fs';

(async () => {
  // print the chunks of a large file in nodejs
  for await (const chunk of createReadStream('./someFile.tx')) {
    console.log(chunk.toString('utf-8'));
  }

  // simple array
  const array = [1,2,3,5];
  for await (const number of array) {
    console.log(number);
  }

  // counting down numbers emitted over time (with async generator)
  const wait = (time = 500) => new Promise((resolve) => {
    setTimeout(() =>{
      resolve();
    }, time)
  });

  async function *sequence(limit = 10) {
    while(limit >= 0){
      await wait();
      yield limit;
      limit--;
    }
  }

  for await (const number of sequence()) {
    console.log(number);
  }
})();

It then becomes easy to write composable transform/filter operators thanks to async generators:

const map = (mapFn) => async function* (stream) {
  for await (const item of stream) {
    yield mapFn(item);
  }
};

const filter = (predicate) => async function* (stream) {
  for await (const item of stream) {
    if (predicate(item)) {
      yield item;
    }
  }
};

// fold a stream, returning a promise
const reduce = (reducer, init = 0) => async function (stream) {
  let acc = init;
  for await (const item of stream) {
    acc = reducer(acc, item);
  }
  return acc;
};

Now if we use a generic composition function, we can easily build a processing pipeline that abstracts away the data source as long as it matches the abstract stream interface defined above

const pipe = (fns) => (arg) => fns.reduce((x, f) => f(x), arg);

const pipeline = pipe([
  filter(isTransactionMatching),
  map(getBalanceAmount),
  reduce(sum, 0)
])

We managed to write our code in a declarative way while processing the transactions one by one; and interestingly the pipeline does not depend on the data source:

const transactions = [{
  balance: -3000,
  date: '2021-09-01',
  category: 'food',
  description: 'Yummy Restaurant corp'
}, /* ... */];

// from an array
pipeline(transactions).then(console.log).catch(console.log)

// from a databse cursor
pipeline(Transaction.find()).then(console.log).catch(console.log)

// etc

Conclusion

While the async iterators interface is a good way to model the concept of readable stream in an abstract way, async generators functions allow to easily define generic processing steps in a declarative way with function composition. This approach lets us gather the best of two programming paradigms: declarative and imperative code.

N.B: the content of this post is drawn from a talk of the lyon-js meetup we held in our office

Laisser un commentaire