Gravatar of Jeroen Jeroen Pelgrims

Node streams in CoffeeScript

Posted on in node, coffeescript, software-development

I was looking for a way to make a search on Google and getting the results back on a one-by-one basis instead of getting them by 10 per query to the api.
In Python I'd use generators for this and I was looking for something similar in Node. As far as I could figure out actual generators are only available starting from v11.x and thus not yet in 10.28, the current stable version. And even if they were, they wouldn't be in CoffeeScript yet.

The closest solution that I found was Streams. It was a bit tricky figuring these out and getting them to work in CoffeeScript, hence this post.

String/Buffer streams§

Streams in node by default handle with Buffer values. (Which can also be seen as strings since calling .toString() on one will convert it to a string.)

stream = require 'stream'

class CharStream extends stream.Readable
    constructor: (@s) ->
        super

    _read: ->
        for c in @s
            @push c
        @push null

class UpperCaseStream extends stream.Transform
    _transform: (chunk, enc, next) ->
        @push chunk.toString().toUpperCase()
        next()

class LogStream extends stream.Writable
    _write: (chunk, enc, next) ->
        console.log chunk.toString()
        next()

Now when we run this:

new CharStream 'ab c'
    .pipe new UpperCaseStream
    .pipe new LogStream

The console output will be:

A
B

C
  1. CharStream takes a string and it's output is each character in that string one by one
  2. UpperCaseStream then takes the results from step 1 and transforms them to uppercase characters.
  3. Finally LogStream takes those uppercased values and logs them in the console

Pushing null§

Note that in _read at the end null is pushed, this is to tell the consumer of the stream that the stream has ended.
If you don't do this the consumer will call _read again, causing the stream to effectively never end. The output will then be an infinite sequence of ['a', 'b', ' ', 'c', 'a', 'b', ...] Pushing things will (might?) trigger _read to be called again.

Object streams§

If you want to use regular javascript objects instead of just strings you'll need to set objectMode = true. It's important to do this in the call to super and not just setting it on this (@)

stream = require 'stream'

class ObjectStream extends stream.Readable
    constructor: (@xs) ->
        super
            objectMode: true

    _read: ->
        for x in @xs
            @push x
        @push null

class GetPropertyStream extends stream.Transform
    constructor: (@getter) ->
        super
            objectMode: true

    _transform: (chunk, enc, next) ->
        @push @getter(chunk)
        next()

class UpperCaseStream extends stream.Transform
    _transform: (chunk, enc, next) ->
        @push chunk.toString().toUpperCase()
        next()

class LogStream extends stream.Writable
    _write: (chunk, enc, next) ->
        console.log chunk.toString()
        next()

Running the following will cause the values of the name properties to be logged in uppercase.

new ObjectStream [
        { name: 'Jake' }
        { name: 'Fred' }
        { name: 'Mark' }
        { name: 'Jeroen' }
    ]
    .pipe new GetPropertyStream (x) -> x.name
    .pipe new UpperCaseStream
    .pipe new LogStream

Result:

JAKE
FRED
MARK
JEROEN

Google search results as a stream§

What I eventually ended up with for getting the search results from Google one-by-one is the following:

stream = require 'stream'
request = require 'request'

class GoogleSearchStream extends stream.Readable
    #https://developers.google.com/web-search/docs/reference

    constructor: (@query, @resultCount=8) ->
        super
            objectMode: true

        @resultsetSize = 8
        @pushed = 0
        @apiUrl = -> "https://ajax.googleapis.com/ajax/services/search/web?v=1.0&q=#{@query}&start=#{@pushed}&rsz=#{@resultsetSize}"
        @pushBlock()

    _read: ->
        #do nothing

    pushBlock: () ->
        request.get
            headers:
                referer: 'http://localhost'
            url: @apiUrl(),
            (err, response, body) =>
                if not err and body and response.statusCode is 200
                    data = (JSON.parse body).responseData

                    for result in data.results
                        @pushed += 1
                        @push result

                        if @pushed >= @resultCount
                            @push null
                            return

                    @pushBlock()
                else
                    @push null

Note that the Google web search api is deprecated. This is just some example code demonstrating what I could figure out from streams. I'm sure that there is a cleaner way of doing something like this and would love tips and improvements in the comments.

This post is written by Jeroen Pelgrims, an independent software developer who runs Digicraft.eu.

Hire Jeroen