Timely Dataflow

In this book we will work through the motivation and technical details behind timely dataflow, which is both a system for implementing distributed streaming computation, and if you look at it right, a way to structure computation generally.

Timely dataflow arose from work at Microsoft Research, where a group of us worked on building scalable, distributed data processing platforms. Our experience was that other systems did not provide both expressive computation and high performance. Efficient systems would only let you write restricted programs, and expressive systems employed synchronous and otherwise inefficient execution.

Our goal was to provide a not-unpleasant experience where you could write sophisticated streaming computations (e.g. with iterative control flow), which nonetheless compile down to systems that execute with only a modicum of overhead and synchronization.

Motivation

Let's start with some motivation: what can you do with timely dataflow, and when should you be excited to use it as opposed to other programming frameworks? Is timely dataflow great for everything, or is it only great for a few things? Is it great for anything? We will try and clarify these questions in this section.

A simplest example

Let's start with what may be the simplest non-trivial timely dataflow program.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect};

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

This program gives us a bit of a flavor for what a timely dataflow program might look like, including a bit of what Rust looks like, without getting too bogged down in weird stream processing details. Not to worry; we will do that in just a moment!

If we run the program up above, we see it print out the numbers zero through nine.

    Echidnatron% cargo run --example simple
        Finished dev [unoptimized + debuginfo] target(s) in 0.05s
         Running `target/debug/examples/simple`
    seen: 0
    seen: 1
    seen: 2
    seen: 3
    seen: 4
    seen: 5
    seen: 6
    seen: 7
    seen: 8
    seen: 9
    Echidnatron%

This isn't very different from a Rust program that would do this much more simply, namely the program

fn main() {
    (0..10).for_each(|x| println!("seen: {:?}", x));
}

Why would we want to make our life so complicated? The main reason is that we can make our program reactive, so that we can run it without knowing ahead of time the data we will use, and it will respond as we produce new data.

An example

Timely dataflow means to capture a large number of idioms, so it is a bit tricky to wrap together one example that shows off all of its features, but let's look at something that shows off some core functionality to give a taste.

The following complete program initializes a timely dataflow computation, in which participants can supply a stream of numbers which are exchanged between the workers based on their value. Workers print to the screen when they see numbers. You can also find this as examples/hello.rs in the timely dataflow repository.

extern crate timely;

use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();

        // create a new input, exchange data, and inspect its output
        let probe = worker.dataflow(|scope|
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe()
        );

        // introduce data and watch!
        for round in 0..10 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

We can run this program in a variety of configurations: with just a single worker thread, with one process and multiple worker threads, and with multiple processes each with multiple worker threads.

To try this out yourself, first clone the timely dataflow repository using git

    Echidnatron% git clone https://github.com/TimelyDataflow/timely-dataflow
    Cloning into 'timely-dataflow'...
    remote: Counting objects: 14254, done.
    remote: Compressing objects: 100% (2267/2267), done.
    remote: Total 14254 (delta 2625), reused 3824 (delta 2123), pack-reused 9856
    Receiving objects: 100% (14254/14254), 9.01 MiB | 1.04 MiB/s, done.
    Resolving deltas: 100% (10686/10686), done.

Now cd into the directory and build timely dataflow by typing

    Echidnatron% cd timely-dataflow
    Echidnatron% cargo build
        Updating registry `https://github.com/rust-lang/crates.io-index`
    Compiling timely_sort v0.1.6
    Compiling byteorder v0.4.2
    Compiling libc v0.2.29
    Compiling abomonation v0.4.5
    Compiling getopts v0.2.14
    Compiling time v0.1.38
    Compiling timely_communication v0.1.7
    Compiling timely v0.2.0 (file:///Users/mcsherry/Projects/temporary/timely-dataflow)
        Finished dev [unoptimized + debuginfo] target(s) in 6.37 secs

Now we build the hello example

    Echidnatron% cargo build --example hello
    Compiling rand v0.3.16
    Compiling timely v0.2.0 (file:///Users/mcsherry/Projects/temporary/timely-dataflow)
        Finished dev [unoptimized + debuginfo] target(s) in 6.35 secs

And finally we run the hello example

    Echidnatron% cargo run --example hello
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello`
    worker 0:	hello 0
    worker 0:	hello 1
    worker 0:	hello 2
    worker 0:	hello 3
    worker 0:	hello 4
    worker 0:	hello 5
    worker 0:	hello 6
    worker 0:	hello 7
    worker 0:	hello 8
    worker 0:	hello 9
    Echidnatron%

Rust is relatively clever, and we could have skipped the cargo build and cargo build --example hello commands; just invoking cargo run --example hello will build (or rebuild) anything necessary.

Of course, we can run this with multiple workers using the -w or --workers flag, followed by the number of workers we want in the process. Notice that you'll need an -- before the arguments to our program; any arguments before that are treated as arguments to the cargo command.

    Echidnatron% cargo run --example hello -- -w2
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello -w2`
    worker 0:	hello 0
    worker 1:	hello 1
    worker 0:	hello 2
    worker 1:	hello 3
    worker 0:	hello 4
    worker 1:	hello 5
    worker 0:	hello 6
    worker 1:	hello 7
    worker 0:	hello 8
    worker 1:	hello 9
    Echidnatron%

Although you can't easily see this happening, timely dataflow has spun up two worker threads and together they have exchanged some data and printed the results as before. However, notice that the worker index is now varied; this is our only clue that different workers exist, and processed different pieces of data. Worker zero introduces all of the data (notice the guard in the code; without this each worker would introduce 0 .. 10), and then it is shuffled between the workers. The only guarantee is that records that evaluate to the same integer in the exchange closure go to the same worker. In practice, we (currently) route records based on the remainder of the number when divided by the number of workers.

Finally, let's run with multiple processes. To do this, you use the -n and -p arguments, which tell each process how many total processes to expect (the -n parameter) and which index this process should identify as (the -p parameter). You can also use -h to specify a host file with names and ports of each of the processes involved, but if you leave it off timely defaults to using the local host.

In one shell, I'm going to start a computation that expects multiple processes. It will hang out waiting for the other processes to start up.

    Echidnatron% cargo run --example hello -- -n2 -p0
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello -n2 -p0`

Now if we head over to another shell, we can type the same thing but with a different -p identifier.

    Echidnatron% cargo run --example hello -- -n2 -p1
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello -n2 -p1`
    worker 1:	hello 1
    worker 1:	hello 3
    worker 1:	hello 5
    worker 1:	hello 7
    worker 1:	hello 9
    Echidnatron%

Wow, fast! And, we get to see some output too. Only the output for this worker, though. If we head back to the other shell we see the process got moving and produced the other half of the output.

    Echidnatron% cargo run --example hello -- -n2 -p0
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello -n2 -p0`
    worker 0:	hello 0
    worker 0:	hello 2
    worker 0:	hello 4
    worker 0:	hello 6
    worker 0:	hello 8
    Echidnatron%

This may seem only slightly interesting so far, but we will progressively build up more interesting tools and more interesting computations, and see how timely dataflow can efficiently execute them for us.

When to use Timely Dataflow

Timely dataflow may be a different programming model than you are used to, but if you can adapt your program to it there are several benefits.

  • Data Parallelism: The operators in timely dataflow are largely "data-parallel", meaning they can operate on independent parts of the data concurrently. This allows the underlying system to distribute timely dataflow computations across multiple parallel workers. These can be threads on your computer, or even threads across computers in a cluster you have access to. This distribution typically improves the throughput of the system, and lets you scale to larger problems with access to more resources (computation, communication, and memory).

  • Streaming Data: The core data type in timely dataflow is a stream of data, an unbounded collection of data not all of which is available right now, but which instead arrives as the computation proceeds. Streams are a helpful generalization of static data sets, which are assumed available at the start of the computation. By expressing your program as a computation on streams, you've explained both how it should respond to static input data sets (feed all the data in at once) but also how it should react to new data that might arrive later on.

  • Expressivity: Timely dataflow's main addition over traditional stream processors is its ability to express higher-level control constructs, like iteration. This moves stream computations from the limitations of straight line code to the world of algorithms. Many of the advantages of timely dataflow computations come from our ability to express a more intelligent algorithm than the alternative systems, which can only express more primitive computations.

There are many domains where streaming and scalability are important, and I'm not sure I can name them all. If you would like to build a scalable monitoring application for a service you run, timely dataflow can express this. If you would like to work with big data computations processing more data than your computer can load into memory, timely dataflow streams can represent this efficiently. If you would like to build an incremental iterative computation over massive data (e.g. matrices, large graphs, text corpora), timely dataflow has done these things.

At the same time, dataflow computation is also another way of thinking about your program. Much like Rust causes you to think a bit harder about program structure, timely dataflow helps you tease out some structure to your program that results in a more effective computation. Even when writing something like grep, a program that scans lines of text looking for patterns, by stating your program as a dataflow computation its implementation immediately scales out to multiple threads, and even across multiple computers.

Generality

Is timely dataflow always applicable? The intent of this research project is to remove layers of abstraction fat that prevent you from expressing anything your computer can do efficiently in parallel.

Under the covers, your computer (the one on which you are reading this text) is a dataflow processor. When your computer reads memory it doesn't actually wander off to find the memory, it introduces a read request into your memory controller, an independent component that will eventually return with the associated cache line. Your computer then gets back to work on whatever it was doing, hoping the responses from the controller return in a timely fashion.

Academically, I treat "my computer can do this, but timely dataflow cannot" as a bug. There are degrees, of course, and timely dataflow isn't on par with the processor's custom hardware designed to handle low level requests efficiently, but algorithmically, the goal is that anything you can do efficiently with a computer you should be able to express in timely dataflow.

When not to use Timely Dataflow

There are several reasons not to use timely dataflow, though many of them are friction about how your problem is probably expressed, rather than fundamental technical limitations. There are fundamental technical limitations too, of course.

I've collected a few examples here, but the list may grow with input and feedback.


Timely dataflow is a dataflow system, and this means that at its core it likes to move data around. This makes life complicated when you would prefer not to move data, and instead move things like pointers and references to data that otherwise stays put.

For example, sorting a slice of data is a fundamental task and one that parallelizes. But, the task of sorting is traditionally viewed as transforming the data in a supplied slice, rather than sending the data to multiple workers and then announcing that it got sorted. The data really does need to end up in one place, one single pre-existing memory allocation, and timely dataflow is not great at problems that cannot be recast as the movement of data.

One could re-imagine the sorting process as moving data around, and indeed this is what happens when large clusters need to be brought to bear on such a task, but that doesn't help you at all if what you needed was to sort your single allocation. A library like Rayon would almost surely be better suited to the task.


Dataflow systems are also fundamentally about breaking apart the execution of your program into independently operating parts. However, many programs are correct only because some things happen before or after other things. A classic example is depth-first search in a graph: although there is lots of work to do on small bits of data, it is crucial that the exploration of nodes reachable along a graph edge complete before the exploration of nodes reachable along the next graph edge.

Although there is plenty of active research on transforming algorithms from sequential to parallel, if you aren't clear on how to express your program as a dataflow program then timely dataflow may not be a great fit. At the very least, the first step would be "fundamentally re-imagine your program", which can be a fine thing to do, but is perhaps not something you would have to do with your traditional program.


Timely dataflow is in a bit of a weird space between language library and runtime system. This means that it doesn't quite have the stability guarantees a library might have (when you call data.sort() you don't think about "what if it fails?"), nor does it have the surrounding infrastructure of a DryadLINQ or Spark style of experience. Part of this burden is simply passed to you, and this may be intolerable depending on your goals for your program.

Chapter 1: Core Concepts

Timely dataflow relies on two fundamental concepts: timestamps and dataflow, which together lead to the concept of progress. We will want to break down these concepts because they play a fundamental role in understanding how timely dataflow programs are structured.

Dataflow

Dataflow programming is fundamentally about describing your program as independent components, each of which operate in response to the availability of input data, as well as describing the connections between these components.

The most important part of dataflow programming is the independence of the components. When you write a dataflow program, you provide the computer with flexibility in how it executes your program. Rather than insisting on a specific sequence of instructions the computer should follow, the computer can work on each of the components as it sees fit, perhaps even sharing the work with other computers.

Timestamps

While we want to enjoy the benefits of dataflow programming, we still need to understand whether and how our computation progresses. In traditional imperative programming we could reason that because instructions happen in some order, then once we reach a certain point all work (of a certain type) must be done. Instead, we will tag the data that move through our dataflow with timestamps, indicating (roughly) when they would have happened in a sequential execution.

Timestamps play at least two roles in timely dataflow: they allow dataflow components to make sense of the otherwise unordered inputs they see ("ah, I received the data in this order, but I should behave as if it arrived in this order"), and they allow the user (and others) to reason about whether they have seen all of the data with a certain timestamp.

Timestamps allow us to introduce sequential structure into our program, without requiring actual sequential execution.

Progress

In a traditional imperative program, if we want to return the maximum of a set of numbers, we just scan all the numbers and return the maximum. We don't have to worry about whether we've considered all of the numbers yet, because the program makes sure not to provide an answer until it has consulted each number.

This simple task is much harder in a dataflow setting, where numbers arrive as input to a component that is tracking the maximum. Before releasing a number as output, the component must know if it has seen everything, as one more value could change its answer. But strictly speaking, nothing we've said so far about dataflow or timestamps provide any information about whether more data might arrive.

If we combine dataflow program structure with timestamped data in such a way that as data move along the dataflow their timestamps only increase, we are able to reason about the progress of our computation. More specifically, at any component in the dataflow, we can reason about which timestamps we may yet see in the future. Timestamps that are no longer possible are considered "passed", and components can react to this information as they see fit.

Continual information about the progress of a computation is the only basis of coordination in timely dataflow, and is the lightest touch we could think of.

Dataflow Programming

Dataflow programming is fundamentally about describing your program as independent components, each of which operate in response to the availability of input data, as well as describing the connections between these components. This has several advantages, mostly in how it allows a computer to execute your program, but it can take a bit of thinking to re-imagine your imperative computation as a dataflow computation.

An example

Let's write an overly simple dataflow program. Remember our examples/hello.rs program? We are going to revisit that, but with some timestamp aspects removed. The goal is to get a sense for dataflow with all of its warts, and to get you excited for the next section where we bring back the timestamps. :)

Here is a reduced version of examples/hello.rs that just feeds data into our dataflow, without paying any attention to progress made. In particular, we have removed the probe() operation, the resulting probe variable, and the use of probe to determine how long we should step the worker before introducing more data.

#![allow(unused_variables)]
extern crate timely;

use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();

        // create a new input, exchange data, and inspect its output
        let probe = worker.dataflow(|scope|
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe()
        );

        // introduce data and watch!
        for round in 0..10 {
            if worker.index() == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            // worker.step_while(|| probe.less_than(input.time()));
        }
    }).unwrap();
}

This program is a dataflow program. There are two dataflow operators here, exchange and inspect, each of which is asked to do a thing in response to input data. The exchange operator takes each datum and hands it to a downstream worker based on the value it sees; with two workers, one will get all the even numbers and the other all the odd numbers. The inspect operator takes an action for each datum, in this case printing something to the screen.

Importantly, we haven't imposed any constraints on how these operators need to run. We removed the code that caused the input to be delayed until a certain amount of progress had been made, and it shows in the results when we run with more than one worker:

    Echidnatron% cargo run --example hello -- -w2
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello -w2`
    worker 1:	hello 1
    worker 1:	hello 3
    worker 0:	hello 0
    worker 1:	hello 5
    worker 0:	hello 2
    worker 1:	hello 7
    worker 1:	hello 9
    worker 0:	hello 4
    worker 0:	hello 6
    worker 0:	hello 8
    Echidnatron%

What a mess. Nothing in our dataflow program requires that workers zero and one alternate printing to the screen, and you can even see that worker one is done before worker zero even gets to printing hello 4.


However, this is only a mess if we are concerned about the order, and in many cases we are not. Imagine instead of just printing the number to the screen, we want to find out which numbers are prime and print them to the screen.

.inspect(|x| {
    // we only need to test factors up to sqrt(x)
    let limit = (*x as f64).sqrt() as u64;
    if *x > 1 && (2 .. limit + 1).all(|i| x % i > 0) {
        println!("{} is prime", x);
    }
})

We don't really care that much about the order (we just want the results), and we have written such a simple primality test that we are going to be thrilled if we can distribute the work across multiple cores.

Let's check out the time to print out the prime numbers up to 10,000 using one worker:

    Echidnatron% time cargo run --example hello -- -w1 > output1.txt
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello -w1`
    cargo run --example hello -- -w1 > output1.txt  59.84s user 0.10s system 99% cpu 1:00.01 total
    Echidnatron%

And now again with two workers:

    Echidnatron% time cargo run --example hello -- -w2 > output2.txt
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello -w2`
    cargo run --example hello -- -w2 > output2.txt  60.74s user 0.12s system 196% cpu 30.943 total
    Echidnatron%

The time is basically halved, from one minute to thirty seconds, which is a great result for those of us who like factoring small numbers. Furthermore, although the 1,262 lines of results of output1.txt and output2.txt are not in the same order, it takes a fraction of a second to make them so, and verify that they are identical:

    Echidnatron% sort output1.txt > sorted1.txt
    Echidnatron% sort output2.txt > sorted2.txt
    Echidnatron% diff sorted1.txt sorted2.txt
    Echidnatron%

This is probably as good a time as any to tell you about Rust's --release flag. I haven't been using it up above to keep things simple, but adding the --release flag to cargo's arguments makes the compilation take a little longer, but the resulting program run a lot faster. Let's do that now, to get a sense for how much of a difference it makes:

    Echidnatron% time cargo run --release --example hello -- -w1 > output1.txt
        Finished release [optimized] target(s) in 0.0 secs
        Running `target/release/examples/hello -w1`
    cargo run --release --example hello -- -w1 > output1.txt  0.78s user 0.06s system 96% cpu 0.881 total
    Echidnatron% time cargo run --release --example hello -- -w2 > output2.txt
        Finished release [optimized] target(s) in 0.0 secs
        Running `target/release/examples/hello -w2`
    cargo run --release --example hello -- -w2 > output2.txt  0.73s user 0.05s system 165% cpu 0.474 total

That is about a 60x speed-up. The good news is that we are still getting approximately a 2x speed-up going from one worker to two, but you can see that dataflow programming does not magically extract all performance from your computer.

This is also a fine time to point out that dataflow programming is not religion. There is an important part of our program up above that is imperative:

    let limit = (*x as f64).sqrt() as u64;
    if *x > 1 && (2 .. limit + 1).all(|i| x % i > 0) {
        println!("{} is prime", x);
    }

This is an imperative fragment telling the inspect operator what to do. We could write this as a dataflow fragment if we wanted, but it is frustrating to do so, and less efficient. The control flow fragment lets us do something important, something that dataflow is bad at: the all method above stops as soon as it sees a factor of x.

There is a time and a place for dataflow programming and for control flow programming. We are going to try and get the best of both.

Logical Timestamps

When dataflow programs move data around arbitrarily, it becomes hard to correlate the produced outputs with the supplied inputs. If we supply a stream of bank transactions as input, and the output is a stream of bank balances, how can we know which input transactions are reflected in which output balances?

The standard approach to this problem is to install timestamps on the data. Each record gets a logical timestamp associated with it that indicates when it should be thought to happen. This is not necessarily "when" in terms of the date, time, or specific nanosecond the record was emitted; a timestamp could simply be a sequence number identifying a batch of input records. Or, and we will get into the terrifying details later, it could be much more complicated than this.

Timestamps are what allow us to correlate inputs and outputs. When we introduce records with some logical timestamp, unless our dataflow computation changes the timestamps, we expect to see corresponding outputs with that same timestamp.

An example

Remember from the dataflow section how when we remove the coordination from our examples/hello.rs program, the output was produced in some horrible order? In fact, each of those records had a timestamp associated with it that would reveal the correct order; we just weren't printing the timestamp because inspect doesn't have access to it.

Let's change the program to print out the timestamp with each record. This shouldn't be a very thrilling output, because the timestamp is exactly the same as the number itself, but that didn't have to be the case. We are just going to replace the line

.inspect(move |x| println!("worker {}:\thello {}", index, x))

with a slightly more complicated operator, inspect_batch.

.inspect_batch(move |t,xs| {
    for x in xs.iter() {
        println!("worker {}:\thello {} @ {:?}", index, x, t)
    }
})

The inspect_batch operator gets lower-level access to data in timely dataflow, in particular access to batches of records with the same timestamp. It is intended for diagnosing system-level details, but we can also use it to see what timestamps accompany the data.

The output we get with two workers is now:

    Echidnatron% cargo run --example hello -- -w2
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/hello -w2`
    worker 1:	hello 1 @ (Root, 1)
    worker 1:	hello 3 @ (Root, 3)
    worker 1:	hello 5 @ (Root, 5)
    worker 0:	hello 0 @ (Root, 0)
    worker 0:	hello 2 @ (Root, 2)
    worker 0:	hello 4 @ (Root, 4)
    worker 0:	hello 6 @ (Root, 6)
    worker 0:	hello 8 @ (Root, 8)
    worker 1:	hello 7 @ (Root, 7)
    worker 1:	hello 9 @ (Root, 9)
    Echidnatron%

The timestamps are the (Root, i) things for various values of i. These happen to correspond to the data themselves, but had we provided random input data rather than i itself we would still be able to make sense of the output and put it back "in order".

Timestamps for dataflow operators

Timestamps are not only helpful for dataflow users, but for the operators themselves. With time we will start to write more interesting dataflow operators, and it may be important for them to understand which records should be thought to come before others.

Imagine, for example, a dataflow operator whose job is to report the "sum so far", where "so far" should be with respect to the timestamp (as opposed to whatever arbitrary order the operator receives the records). Such an operator can't simply take its input records, add them to a total, and produce the result. The input records may no longer be ordered by timestamp, and the produced summations may not reflect any partial sum of the input. Instead, the operator needs to look at the timestamps on the records, and incorporate the numbers in order of their timestamps.

Of course, such an operator works great as long as it expects exactly one record for each timestamp. Things get harder for it if it might receive multiple records at each timestamp, or perhaps none. To address this, the underlying system will have to help the operator reason about the progress of its input, up next.

Tracking Progress

Both dataflow and timestamps are valuable in their own right, but when we bring them together we get something even better. We get the ability to reason about the flow of timestamps through our computation, and we recover the ability to inform each dataflow component about how much of its input data it has seen.

Let's recall that bit of code we commented out from examples/hello.rs, which had to do with consulting something named probe.

extern crate timely;

use timely::dataflow::InputHandle;
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();

        // create a new input, exchange data, and inspect its output
        let probe = worker.dataflow(|scope|
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe()
        );

        // introduce data and watch!
        for round in 0..10 {
            if worker.index() == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            worker.step_while(|| probe.less_than(input.time()));
        }
    }).unwrap();
}

We'll put the whole program up here, but there are really just two lines that deal with progress tracking:

input.advance_to(round + 1);
worker.step_while(|| probe.less_than(input.time()));

Let's talk about each of them.

Input capabilities

The input structure is how we provide data to a timely dataflow computation, and it has a timestamp associated with it. Initially this timestamp is the default value, usually something like 0 for integers. Whatever timestamp input has, it can introduce data with that timestamp or greater. We can advance this timestamp, via the advance_to method, which restricts the timestamps we can use to those greater or equal to whatever timestamp is supplied as the argument.

The advance_to method is a big deal. This is the moment in the computation where our program reveals to the system, and through the system to all other dataflow workers, that we might soon be able to announce a timestamp as complete. There may still be records in flight bearing that timestamp, but as they are retired the system can finally report that progress has been made.

Output possibilities

The probe structure is how we learn about the possibility of timestamped data at some point in the dataflow graph. We can, at any point, consult a probe with the less_than method and ask whether it is still possible that we might see a time less than the argument at that point in the dataflow graph. There is also a less_equal method, if you prefer that.

Putting a probe after the inspect operator, which passes through all data it receives as input only after invoking its method, tells us whether we should expect to see the method associated with inspect fire again for a given timestamp. If we are told we won't see any more messages with timestamp t after the inspect, then the inspect won't see any either.

The less_than and less_equal methods are the only place where we learn about the state of the rest of the system. These methods are non-blocking; they always return immediately with either a "yes, you might see such a timestamp" or a "no, you will not see such a timestamp".

Responding to progress information

Progress information is relatively passive. We get to observe what happens in the rest of the system, and perhaps change our behavior based on the amount of progress. We do not get to tell the system what to do next, we just get to see what has happened since last we checked.

This passive approach to coordination allows the system to operate with minimal overhead. Workers exchange both data and progress information. If workers want to wait for further progress before introducing more data they see they are welcome to do so, but they can also go and work on a different part of the dataflow graph as well.

Progress information provides a relatively unopinionated view of coordination. Workers are welcome to impose a more synchronous discipline using progress information, perhaps proceeding in sequence through operators by consulting probes installed after each of them, but they are not required to do so. Synchronization is possible, but it becomes a choice made by the workers themselves, rather than imposed on them by the system.

Building Timely Dataflows

Let's talk about how to create timely dataflows.

This section will be a bit of a tour through the dataflow construction process, ignoring for the moment details about the interesting ways in which you can get data into and out of your dataflow; those will show up in the "Running Timely Dataflows" section. For now we are going to work with examples with fixed input data and no interactivity to speak of, focusing on what we can cause to happen to that data.

Here is a relatively simple example, taken from timely/examples/simple.rs, that turns the numbers zero through nine into a stream, and then feeds them through an inspect operator printing them to the screen.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect};

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

We are going to develop out this example, showing off both the built-in operators as well as timely's generic operator construction features.


NOTE: Timely very much assumes that you are going to build the same dataflow on each worker. You don't literally have to, in that you could build a dataflow from user input, or with a random number generator, things like that. Please don't! It will not be a good use of your time.

Creating Inputs

Let's start with the first thing we'll want for a dataflow computation: a source of data.

Almost all operators in timely can only be defined from a source of data, with a few exceptions. One of these exceptions is the to_stream operator, which is defined for various types and which takes a scope as an argument and produces a stream in that scope. Our InputHandle type from previous examples has a to_stream method, as well as any type that can be turned into an iterator (which we used in the preceding example).

For example, we can create a new dataflow with one interactive input and one static input:

extern crate timely;

use timely::dataflow::InputHandle;
use timely::dataflow::operators::ToStream;

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let mut input = InputHandle::<(), String>::new();

        // define a new dataflow
        worker.dataflow(|scope| {

            let stream1 = input.to_stream(scope);
            let stream2 = (0 .. 9).to_stream(scope);

        });

    }).unwrap();
}

There will be more to do to get data into input, and we aren't going to worry about that at the moment. But, now you know two of the places you can get data from!

Other sources

There are other sources of input that are a bit more advanced. Once we learn how to create custom operators, the source method will allow us to create a custom operator with zero input streams and one output stream, which looks like a source of data (hence the name). There are also the Capture and Replay traits that allow us to exfiltrate the contents of a stream from one dataflow (using capture_into) and re-load it in another dataflow (using replay_into).

Observing Outputs

Having constructed a minimal streaming computation, we might like to take a peek at the output. There are a few ways to do this, but the simplest by far is the inspect operator.

The inspect operator is called with a closure, and it ensures that the closure is run on each record that passes through the operator. This closure can do just about anything, from printing to the screen or writing to a file.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        worker.dataflow::<(),_,_>(|scope| {
            (0 .. 9)
                .to_stream(scope)
                .inspect(|x| println!("hello: {}", x));
        });
    }).unwrap();
}

This simple example turns the sequence zero through nine into a stream and then prints the results to the screen.

Inspecting Batches

The inspect operator has a big sibling, inspect_batch, whose closure gets access to whole batches of records at a time, just like the underlying operator. More precisely, inspect_batch takes a closure of two parameters: first, the timestamp of a batch, and second a reference to the batch itself. The inspect_batch operator can be especially helpful if you want to process the outputs more efficiently.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        worker.dataflow::<(),_,_>(|scope| {
            (0 .. 10)
                .to_stream(scope)
                .inspect_batch(|t, xs| println!("hello: {:?} @ {:?}", xs, t));
        });
    }).unwrap();
}

Capturing Streams

The Capture trait provides a mechanism for exfiltrating a stream from a dataflow, into information that can be replayed in other dataflows. The trait is pretty general, and can even capture a stream to a binary writer that can be read back from to reconstruct the stream (see examples/capture_send.rs and examples/capture_recv.rs).

The simplest form of capture is the capture() method, which turns the stream into a shared queue of "events", which are the sequence of events the operator is exposed to: data arriving and notification of progress through the input stream. The capture method is used in many of timely's documentation tests, to extract a stream and verify that it is correct.

Consider the documentation test for the ToStream trait:

extern crate timely;

use timely::dataflow::operators::{ToStream, Capture};
use timely::dataflow::operators::capture::Extract;

fn main() {
    let (data1, data2) = timely::example(|scope| {
        let data1 = (0..3).to_stream(scope).capture();
        let data2 = vec![0,1,2].to_stream(scope).capture();
        (data1, data2)
    });

    assert_eq!(data1.extract(), data2.extract());
}

Here the two capture methods each return the receive side of one of Rust's threadsafe channels. The data moving along the channel have a type capture::Event<T,D> which you would need to read about, but which your main thread can drain out of the channel and process as it sees fit.

Introducing Operators

In between introducing streams of data and inspecting or capturing the output, we'll probably want to do some computation on those data. There are a lot of things that you can do, and timely comes with a set of generally useful operators built in. We will survey a few of these, but this list will be necessarily incomplete: the operators are pretty easy to write, and keep showing up.

Mapping

One of the simplest things one can do with a stream of data is to transform each record into a new record. In database terminology this would be called "projection", where you extract some fields from a larger record, but as we are in a more rich programming language we can perform arbitrary transformations.

The map operator takes as an argument a closure from the input data type to an output data type that you get to define. The result is the stream of records corresponding to the application of your closure to each record in the input stream.

The following program should print out the numbers one through ten.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Map};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        worker.dataflow::<(),_,_>(|scope| {
            (0 .. 9)
                .to_stream(scope)
                .map(|x| x + 1)
                .inspect(|x| println!("hello: {}", x));
        });
    }).unwrap();
}

The closure map takes owned data as input, which means you are able to mutate it as you like without cloning or copying the data. For example, if you have a stream of String data, then you could upper-case the string contents without having to make a second copy; your closure owns the data that comes in, with all the benefits that entails.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Map};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        worker.dataflow::<(),_,_>(|scope| {
            (0 .. 9)
                .to_stream(scope)
                .map(|x| x.to_string())
                .map(|mut x| { x.truncate(5); x } )
                .inspect(|x| println!("hello: {}", x));
        });
    }).unwrap();
}

Map variants

There are a few variants of map with different functionality.

For example, the map_in_place method takes a closure which receives a mutable reference and produces no output; instead, this method allows you to change the data in-place, which can be a valuable way to avoid duplication of resources.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Map};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        worker.dataflow::<(),_,_>(|scope| {
            (0 .. 9)
                .to_stream(scope)
                .map(|x| x.to_string())
                .map_in_place(|x| x.truncate(5))
                .inspect(|x| println!("hello: {}", x));
        });
    }).unwrap();
}

Alternately, the flat_map method takes input data and allows your closure to transform each element to an iterator, which it then enumerates into the output stream. The following fragment takes each number from zero through eight and has each produce all numbers less than it. The result should be 8 zeros, 7 ones, and so on up to 1 seven.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Map};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        worker.dataflow::<(),_,_>(|scope| {
            (0 .. 9)
                .to_stream(scope)
                .flat_map(|x| 0 .. x)
                .inspect(|x| println!("hello: {}", x));
        });
    }).unwrap();
}

Filtering

Another fundamental operation is filtering, in which a predicate dictates a subset of the stream to retain.

extern crate timely;

use timely::dataflow::operators::{ToStream, Inspect, Filter};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {
        worker.dataflow::<(),_,_>(|scope| {
            (0 .. 9)
                .to_stream(scope)
                .filter(|x| *x % 2 == 0)
                .inspect(|x| println!("hello: {}", x));
        });
    }).unwrap();
}

Unlike map, the predicate passed to the filter operator does not receive owned data, but rather a reference to the data. This allows filter to observe the data to determine whether to keep it, but not to change it.

Logical Partitioning

There are two operators for splitting and combining streams, partition and concat respectively.

The partition operator takes two arguments, a number of resulting streams to produce, and a closure which takes each record to a pair of the target partition identifier and the output record. The output of partition is a list of streams, where each stream contains those elements mapped to the stream under the closure.

extern crate timely;

use timely::dataflow::operators::{ToStream, Partition, Inspect};

fn main() {
    timely::example(|scope| {
        let streams = (0..10).to_stream(scope)
                             .partition(3, |x| (x % 3, x));

        streams[0].inspect(|x| println!("seen 0: {:?}", x));
        streams[1].inspect(|x| println!("seen 1: {:?}", x));
        streams[2].inspect(|x| println!("seen 2: {:?}", x));
    });
}

This example breaks the input stream apart into three logical streams, which are then subjected to different inspect operators. Importantly, partition only logically partitions the data, it does not move the data between workers. In the example above, each worker partitions its stream into three parts and no data are exchanged at all (as inspect does not require that of its inputs).

In the other direction, concat takes two streams and produces one output stream containing elements sent along either. The following example merges the partitioned streams back together.

extern crate timely;

use timely::dataflow::operators::{ToStream, Partition, Concat, Inspect};

fn main() {
    timely::example(|scope| {
        let streams = (0..10).to_stream(scope)
                             .partition(3, |x| (x % 3, x));
        streams[0]
            .concat(&streams[1])
            .concat(&streams[2])
            .inspect(|x| println!("seen: {:?}", x));
    });
}

There is also a concatenate method defined for scopes which collects all streams from a supplied vector, effectively undoing the work of partition in one operator.

extern crate timely;

use timely::dataflow::operators::{ToStream, Partition, Concatenate, Inspect};

fn main() {
    timely::example(|scope| {
        let streams = (0..10).to_stream(scope)
                             .partition(3, |x| (x % 3, x));

        scope.concatenate(streams)
             .inspect(|x| println!("seen: {:?}", x));
    });
}

Both concat and concatenate are efficient operations that exchange no data between workers, operate only on batches of stream elements, and do not make further copies of the data.

Physical Partitioning

To complement the logical partitioning of partition, timely also provides the physical partitioning operator exchange which routes records to a worker based on a supplied closure. The exchange operator does not change the contents of the stream, but rather the distribution of elements to the workers. This operation can be important if you would like to collect records before printing statistics to the screen, or otherwise do some work that requires a specific data distribution.

Operators that require a specific data distribution will ensure that this occurs as part of their definition. As the programmer, you should not need to invoke exchange.

There are times where exchange can be useful. For example, if a stream is used by two operators requiring the same distribution, simply using the stream twice will cause duplicate data exchange as each operator satisfies its requirements. Instead, it may make sense to invoke exchange to move the data once, at which point the two operators will no longer require serialization and communication to shuffle their inputs appropriately.

Other operators

There are any number of other operators, most of which you should be able to find in the timely::dataflow::operators module. Scanning through the documentation for this module may lead you to operators that you need, and alternately their implementations may demonstrate how to construct similar operators, if the one you require is not present. Operator construction is the subject of the next section!

Creating Operators

What if there isn't an operator that does what you want to do? What if what you want to do is better written as imperative code rather than a tangle of dataflow operators? Not a problem! Timely dataflow has you covered.

Timely has several "generic" dataflow operators that are pretty much ready to run, except someone (you) needs to supply their implementation. This isn't as scary as it sounds; you just need to write a closure that says "given a handle to my inputs and outputs, what do I do when timely asks me to run?".

Let's look at an example

extern crate timely;

use timely::dataflow::operators::ToStream;
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

fn main() {
    timely::example(|scope| {
        (0u64..10)
            .to_stream(scope)
            .unary(Pipeline, "increment", |capability, info| {

                let mut vector = Vec::new();
                move |input, output| {
                    while let Some((time, data)) = input.next() {
                        data.swap(&mut vector);
                        let mut session = output.session(&time);
                        for datum in vector.drain(..) {
                            session.give(datum + 1);
                        }
                    }
                }
            })
            .container::<Vec<_>>();
    });
}

What is going on here? The heart of the mess is the dataflow operator unary, which is a ready-to-assemble dataflow operator with one input and one output. The unary operator takes three arguments (it looks like so many more!): (i) instructions about how it should distribute its inputs, (ii) a tasteful name, and (iii) the logic it should execute whenever timely gives it a chance to do things.

Most of what is interesting lies in the closure, so let's first tidy up some loose ends before we dive in there. There are a few ways to request how input data should be distributed and Pipeline is the one that says "don't move anything". The string "increment" is utterly arbitrary; this happens to be what the operator does, but you could change it to be your name, or a naughty word, or whatever you like. The |capability| stuff should be ignored for the moment; we'll explain in just a moment (it has to do with whether you would like the ability to send data before you receive any).

The heart of the logic lies in the closure that binds input and output. These two are handles respectively to the operator's input (from which it can read records) and the operator's output (to which it can send records).

The input handle input has one primary method, next, which may return a pair consisting of a CapabilityRef<Timestamp> and a batch of data. Rust really likes you to demonstrate a commitment to only looking at valid data, and our while loop does what is called deconstruction: we acknowledge the optional structure and only execute in the case the Option variant is Some, containing data. The next method could also return None, indicating that there is no more data available at the moment. It is strongly recommended that you take the hint and stop trying to read inputs at that point; timely gives you the courtesy of executing whatever code you want in this closure, but if you never release control back to the system you'll break things (timely employs "cooperative multitasking").

The output handle output has one primary method, session, which starts up an output session at the indicated time. The resulting session can be given data in various ways: (i) element at a time with give, (ii) iterator at a time with give_iterator, and (iii) vector at a time with give_content. Internally it is buffering up the output and flushing automatically when the session goes out of scope, which happens above when we go around the while loop.

Other shapes

The unary method is handy if you have one input and one output. What if you want something with two inputs? Or what about zero inputs? We've still got you covered.

There is a binary method which looks a lot like unary, except that it has twice as many inputs (and ways to distribute the inputs), and requires a closure accepting two inputs and one output. You still get to write arbitrary code to drive the operator around as you like.

There is also a method operators::source which .. has no inputs. You can't call it on a stream, for obvious reasons, but you call it with a scope as an argument. It looks just like the other methods, except you supply a closure that just takes an output as an argument and sends whatever it wants each time it gets called. This is great for reading from external sources and moving data along as you like.

Capabilities

We skipped a discussion of the capability argument, and we need to dig into that now.

One of timely dataflow's main features is its ability to track whether an operator may or may not in the future receive more records bearing a certain timestamp. The way that timely does this is by requiring that its operators, like the ones we have written, hold capabilities for sending data at any timestamp. A capability is an instance of the Capability<Time> type, which looks to the outside world like an instance of Time, but which output will demand to see before it allows you to create a session.

Remember up where we got things we called time and from which we created a session with session(&time)? That type was actually a capability.

Likewise, the capability argument that we basically ignored is also a capability. It is a capability for the default value of Time, from which one can send data at any timestamp. All operators get one of these to start out with, and until they downgrade or discard them, they retain the ability to send records at any time. The flip side of this is that the system doesn't make any progress until the operator downgrades or discards the capability.

The capability argument exists so that we can construct operators with the ability to send data before they receive any data. This is occasionally important for unary and binary operators, but it is crucially important for operators with no inputs. If we want to create an operator that reads from an external source and sends data, we'll need to keep hold of some capability.

Here is an example source implementation that produces all numbers up to some limit, each at a distinct time.

extern crate timely;

use timely::dataflow::operators::Inspect;
use timely::dataflow::operators::generic::operator::source;

fn main() {
    timely::example(|scope| {

        source(scope, "Source", |capability, info| {

            // Acquire a re-activator for this operator.
            use timely::scheduling::Scheduler;
            let activator = scope.activator_for(&info.address[..]);

            let mut cap = Some(capability);
            move |output| {

                let mut done = false;
                if let Some(cap) = cap.as_mut() {

                    // get some data and send it.
                    let time = cap.time().clone();
                    output.session(&cap)
                          .give(*cap.time());

                    // downgrade capability.
                    cap.downgrade(&(time + 1));
                    done = time > 20;
                }

                if done { cap = None; }
                else    { activator.activate(); }
            }
        })
        .container::<Vec<_>>()
        .inspect(|x| println!("number: {:?}", x));
    });
}

The details seem a bit tedious, but let's talk them out. The first thing we do is capture capability in the variable cap, whose type is Option<Capability<Time>>. This type is important because it will allow us to eventually discard the capability, replacing it with None. If we always held a Capability<Time>, the best we could do would be to continually downgrade it. Another option is Vec<Capability<Time>>, which we could eventually clear.

Our next step is to define and return a closure that takes output as a parameter. The move keyword is part of Rust and is an important part of making sure that cap makes its way into the closure, rather than just evaporating from the local scope when we return.

The closure does a bit of a dance to capture the current time (not a capability, in this case), create a session with this time and send whatever the time happens to be as data, then downgrade the capability to be one timestep in the future. If it turns out that this is greater than twenty we discard the capability.

The system is smart enough to notice when you downgrade and discard capabilities, and it understands that these actions represent irreversible actions on your part that can now be communicated to others in the dataflow. As this closure is repeatedly executed, the timestamp of the capability will advance and the system will be able to indicate this to downstream operators.

Stateful operators

It may seem that we have only considered stateless operators, those that are only able to read from their inputs and immediately write to their outputs. But, you can have whatever state that you like, using the magic of Rust's closures. When we write a closure, it can capture ("close over") any state that is currently in scope, taking ownership of it. This is actually what we did up above with the capability. If that sounds too abstract, let's look at an example.

Our unary example from way back just incremented the value and passed it along. What if we wanted to only pass values larger than any value we have seen so far? We just define a variable maximum which we check and update as we would normally. Importantly, we should define it outside the closure we return, so that it persists across calls, and we need to use the move keyword so that the closure knows it is supposed to take ownership of the variable.

extern crate timely;

use timely::dataflow::operators::ToStream;
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

fn main() {
    timely::example(|scope| {
        (0u64..10)
            .to_stream(scope)
            .unary(Pipeline, "increment", |capability, info| {

                let mut maximum = 0;    // define this here; use in the closure
                let mut vector = Vec::new();

                move |input, output| {
                    while let Some((time, data)) = input.next() {
                        data.swap(&mut vector);
                        let mut session = output.session(&time);
                        for datum in vector.drain(..) {
                            if datum > maximum {
                                session.give(datum + 1);
                                maximum = datum;
                            }
                        }
                    }
                }
            })
            .container::<Vec<_>>();
    });
}

This example just captures an integer, but you could just as easily define and capture ownership of a HashMap, or whatever complicated state you would like repeated access to.

Bear in mind that this example is probably a bit wrong, in that we update maximum without paying any attention to the times of the data that come past, and so we may report a sequence of values that doesn't seem to correspond with the sequence when sorted by time. Writing sane operators in the presence of batches of data at shuffled times requires more thought. Specifically, for an operator to put its input back in order it needs to understand which times it might see in the future, which was the reason we were so careful about those capabilities and is the subject of the next subsection.

Frontiered operators

Timely dataflow is constantly tracking the capabilities of operators throughout the dataflow graph, and it reports this information to operators through what are called "frontiers". Each input has an associated frontier, which is a description of the timestamps that might arrive on that input in the future.

Specifically, each input has a frontier method which returns a &[Timestamp], indicating a list of times such that any future time must be greater or equal to some element of the list. Often this list will just have a single element, indicating the "current" time, but as we get to more complicated forms of time ("partially ordered" time, if that means anything to you yet) we may need to report multiple incomparable timestamps.

This frontier information is invaluable for operators that must be sure that their output is correct and final before they send it as output. For our maximum example, we will want to wait to apply the new maximum until we are sure that we will not see any more elements at earlier times. That isn't to say we can't do anything with data we receive "early"; in the case of the maximum, each batch at a given time can be reduced down to just its maximum value, as all received values would be applied simultaneously.

To make life easier for you, we've written a helper type called Notificator whose job in life is to help you keep track of times that you would like to send outputs, and to tell you when (according to your input frontiers) it is now safe to send the data. In fact, notificators do more by holding on to the capabilities for you, so that you can be sure that, even if you don't receive any more messages but just an indication that there will be none, you will still retain the ability to send your messages.

Here is a worked example where we use a binary operator that implements the behavior of concat, but it puts its inputs in order, buffering its inputs until their associated timestamp is complete, and then sending all data at that time. The operator defines and captures a HashMap<Time, Vec<Data>> named stash which it uses to buffer received input data that are not yet ready to send.

extern crate timely;

use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

fn main() {
    timely::example(|scope| {

        let in1 = (0 .. 10).to_stream(scope);
        let in2 = (0 .. 10).to_stream(scope);

        in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {

            let mut notificator = FrontierNotificator::new();
            let mut stash = HashMap::new();

            move |input1, input2, output| {
                while let Some((time, data)) = input1.next() {
                    stash.entry(time.time().clone())
                         .or_insert(Vec::new())
                         .push(data.replace(Vec::new()));
                    notificator.notify_at(time.retain());
                }
                while let Some((time, data)) = input2.next() {
                    stash.entry(time.time().clone())
                         .or_insert(Vec::new())
                         .push(data.replace(Vec::new()));
                    notificator.notify_at(time.retain());
                }

                notificator.for_each(&[input1.frontier(), input2.frontier()], |time, notificator| {
                    let mut session = output.session(&time);
                    if let Some(list) = stash.remove(time.time()) {
                        for mut vector in list.into_iter() {
                            session.give_vec(&mut vector);
                        }
                    }
                });
            }
        });
    });
}

As an exercise, this example could be improved in a few ways. How might you change it so that the data are still sent in the order they are received, but messages may be sent as soon as they are received if their time is currently in the frontier? This would avoid buffering messages that are ready to go, and would only buffer messages that are out-of-order, potentially reducing the memory footprint and improving the effective latency.

Before ending the section, let's rewrite this example without the notificator, in an attempt to demystify how it works. Whether you use a notificator or not is up to you; they are mostly about staying sane in what can be a confusing setting, and you can totally skip them once you have internalized how capabilities and frontiers work.

extern crate timely;

use std::collections::HashMap;
use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

fn main() {
    timely::example(|scope| {

        let in1 = (0 .. 10).to_stream(scope);
        let in2 = (0 .. 10).to_stream(scope);

        in1.binary_frontier(&in2, Pipeline, Pipeline, "concat_buffer", |capability, info| {

            let mut stash = HashMap::new();

            move |input1, input2, output| {

                while let Some((time, data)) = input1.next() {
                    stash.entry(time.retain())
                         .or_insert(Vec::new())
                         .push(data.replace(Vec::new()));
                }
                while let Some((time, data)) = input2.next() {
                    stash.entry(time.retain())
                         .or_insert(Vec::new())
                         .push(data.replace(Vec::new()));
                }

                // consider sending everything in `stash`.
                let frontiers = &[input1.frontier(), input2.frontier()];
                for (time, list) in stash.iter_mut() {
                    // if neither input can produce data at `time`, ship `list`.
                    if frontiers.iter().all(|f| !f.less_equal(time.time())) {
                        let mut session = output.session(&time);
                        for mut vector in list.drain(..) {
                            session.give_vec(&mut vector);
                        }
                    }
                }

                // discard `time` entries with empty `list`.
                stash.retain(|time, list| list.len() > 0);
            }
        });
    });
}

Take a moment and check out the differences. Mainly, stash is now the one source of truth about time and data, but we now have to do our own checking of time against the input frontiers, and very importantly we need to make sure to discard time from the stash when we are finished with it (otherwise we retain the ability to send at time, and the system will not make progress).

A Worked Example

You may have heard of word_count as the archetypical "big data" problem: you have a large collection of text, and what you want most in life is to know how many of each word are present in the text. The data are too large to load into memory, but let's assume that the set of distinct words, each with an associated count, is small enough to fit in memory.

Let's take the word_count example in the streaming direction. For whatever reason, your collection of text changes. As time moves along, some new texts are added and some old texts are retracted. We don't know why this happens, we just get told about the changes. Our new job is to maintain the word_count computation, in the face of arbitrary changes to the collection of texts, as promptly as possible.

Let's model a changing corpus of text as a list of pairs of times which will be u64 integers with a list of changes which are each pairs (String, i64) indicating the text and whether it has been added (+1) or removed (-1).

We are going to write a program that is the moral equivalent of the following sequential Rust program:

#![allow(unused)]
fn main() {
/// From a sequence of changes to the occurrences of text,
/// produce the changing counts of words in that text.
fn word_count(mut history: Vec<(u64, Vec<(String, i64)>)>) {
    let mut counts = ::std::collections::HashMap::new();
    for (time, mut changes) in history.drain(..) {
        for (text, diff) in changes.drain(..) {
            for word in text.split_whitespace() {
                let mut entry = counts.entry(word.to_owned())
                                      .or_insert(0i64);
                *entry += diff;
                println!("seen: {:?}", (word, *entry));
            }
        }
    }
}
}

This program is fairly straightforward; hopefully you understand its intent, even if you aren't familiar with every method and type. However, the program is also very specific about what must happen: we process the history in order, and for each time we process the text changes in order. The program does not allow for any flexibility here.

Our program will be a bit larger, but it will be more flexible. By specifying more about what we want to happen to the data, and less about which order this needs to happen, we will gain the ability to scale out to multiple workers across multiple machines.

Starting out with text streams

Let's first build a timely computation into which we can send text and which will show us the text back. Our next steps will be to put more clever logic in place, but let's start here to get some boiler-plate out of the way.

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        // create input and output handles.
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // build a new dataflow.
        worker.dataflow(|scope| {
            input.to_stream(scope)
                 .inspect(|x| println!("seen: {:?}", x))
                 .probe_with(&mut probe);
        });

        // feed the dataflow with data.
        for round in 0..10 {
            input.send(("round".to_owned(), 1));
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

This example code is pretty close to a minimal non-trivial timely dataflow computation. It explains how participating timely workers (there may be many, remember) should construct and run a timely dataflow computation.

After some boiler-plate including the timely crate and some of its traits and types, we get to work:

        // create input and output handles.
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

The input handle is how we supply data to the computation, and the probe handle is how we check whether the computation is complete up through certain inputs. Since a streaming computation may never "finish", probe is the only way to understand how much progress we've made.

The next step is to build a timely dataflow. Here we use input as a source of data, and attach probe to the end so that we can watch for completion of work.

        // build a new dataflow.
        worker.dataflow(|scope| {
            input.to_stream(scope)
                 .inspect(|x| println!("seen: {:?}", x))
                 .probe_with(&mut probe);
        });

This computation is pretty simple: it just prints out the inputs we send at it.

Having constructed the dataflow, we feed it some data.

        // feed the dataflow with data.
        for round in 0..10 {
            input.send(("round".to_owned(), 1));
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }

There are several things going on here. First, we send some data into the input, which allows the data to circulate through the workers along the dataflow. This data will be of type (String, i64), because our example wants to send some text and annotate each with the change in the count (we add or remove text with +1 or -1, respectively). Second, we advance_to to tell timely dataflow that we have ceased sending data for round and anything before it. At this point timely can start to reason about round becoming complete, once all the associated data make their way through the dataflow. Finally, we repeatedly step the worker until probe reports that it has caught up to round + 1, meaning that data for round are fully flushed from the system (and printed to the screen, one hopes).

Breaking text into words

Let's add a simple operator that takes our text strings we supply as input and breaks them into words.

More specifically, we will take (String, i64) pairs and break them into many (String, i64) pairs with the same i64 value, because if we are adding some text we'll add the words, and if subtracting text we'll subtract the words.

Rather than repeat all the code up above, I'm just going to show you the fragment you insert between to_stream and inspect:

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Inspect, Probe, Map};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        // create input and output handles.
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // build a new dataflow.
        worker.dataflow(|scope| {
            input.to_stream(scope)
                 .flat_map(|(text, diff): (String, i64)|
                     text.split_whitespace()
                         .map(move |word| (word.to_owned(), diff))
                         .collect::<Vec<_>>()
                 )
                 .inspect(|x| println!("seen: {:?}", x))
                 .probe_with(&mut probe);
        });

        // feed the dataflow with data.
        for round in 0..10 {
            input.send(("round".to_owned(), 1));
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

The flat_map method expects to be told how to take each record and turn it into an iterator. Here, we are saying that each received text should be split (at whitespace boundaries), and each resulting word should be paired up with diff. We do a weird collect thing at the end because split_whitespace tries to hand back pointers into text and it makes life complicated. Sorry, blame Rust (and then blame me for using Rust).

This code should now show us the stream of (word, diff) pairs that fly by, but we still haven't done anything complicated with them yet.

Maintaining word counts

This gets a bit more interesting. We don't have an operator to maintain word counts, so we are going to write one.

We start with a stream of words and differences coming at us. This stream has no particular structure, and in particular if the stream is distributed across multiple workers we have no assurance that all instances of the same word are at the same worker. This means that if each worker just adds up the counts for each word, we will get a bunch of partial results, local to each worker.

We will need to introduce data exchange, where the workers communicate with each other to shuffle the data so that the resulting distribution provides correct results. Specifically, we are going to distribute the data so that each individual word goes to the same worker, but the words themselves may be distributed across workers.

Having exchanged the data, each worker will need a moment of care when it processes its inputs. Because the data are coming in from multiple workers, they may no longer be in "time order"; some workers may have moved through their inputs faster than others, and may be producing data for the next time while others lag behind. This operator means to produce the word count changes as if processed sequentially, and it will need to delay processing changes that come early.

As before, I'm just going to show you the new code, which now lives just after flat_map and just before inspect:

extern crate timely;

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Inspect, Probe, Map};

use std::collections::HashMap;
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::operators::Operator;


fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        // create input and output handles.
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // build a new dataflow.
        worker.dataflow::<usize,_,_>(|scope| {
            input.to_stream(scope)
                .flat_map(|(text, diff): (String, i64)|
                    text.split_whitespace()
                        .map(move |word| (word.to_owned(), diff))
                        .collect::<Vec<_>>()
                )
                .unary_frontier(
                    Exchange::new(|x: &(String, i64)| (x.0).len() as u64),
                    "WordCount",
                    |_capability, operator_info| {

                    // allocate operator-local storage.
                    let mut queues = HashMap::new();
                    let mut counts = HashMap::new();
                    let mut buffer = Vec::new();

                    move |input, output| {

                        // for each input batch, stash it at `time`.
                        while let Some((time, data)) = input.next() {
                            queues.entry(time.retain())
                                  .or_insert(Vec::new())
                                  .extend(data.replace(Vec::new()));
                        }

                        // enable each stashed time if ready.
                        for (time, vals) in queues.iter_mut() {
                            if !input.frontier().less_equal(time.time()) {
                                let vals = std::mem::replace(vals, Vec::new());
                                buffer.push((time.clone(), vals));
                            }
                        }

                        // drop complete time and allocations.
                        queues.retain(|time, vals| vals.len() > 0);

                        // sort ready updates by time.
                        buffer.sort_by(|x,y| (x.0).time().cmp(&(y.0).time()));

                        // retire updates in time order.
                        for (time, mut vals) in buffer.drain(..) {
                            let mut session = output.session(&time);
                            for (word, diff) in vals.drain(..) {
                                let entry = counts.entry(word.clone()).or_insert(0i64);
                                *entry += diff;
                                session.give((word, *entry));
                            }
                        }
                    }
                })
                 .container::<Vec<_>>()
                 .inspect(|x| println!("seen: {:?}", x))
                 .probe_with(&mut probe);
        });

        // feed the dataflow with data.
        for round in 0..10 {
            input.send(("round".to_owned(), 1));
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

That was probably a lot to see all at once. So let's break down each of the things we did.

.unary_frontier(
    Exchange::new(|x: &(String, i64)| (x.0).len() as u64),
    "WordCount",
    |_capability, operator_info| {
        // coming soon!
    }
)

The very first thing we did was state that we are going to build a new unary dataflow operator. Timely lets you build your own operators just by specifying the logic for them as a closure. So easy! But, we have to explain a few things to the operator.

First, we tell it how it should distribute the data (pairs of strings and differences) between workers. Here we are saying "by the length of the text" which is a deranged way to do it, but we'd need about five more lines to properly write hashing code for the string.

Second, we give a descriptive name so that the operator is recognizable in logging and diagnostic code; you probably don't care at the moment, but you might later on if you wonder what is going on.

Third and finally, we specify a closure. The closure has an argument, which we ignore in the code (it has to do with writing operators that can send output data before they receive any input data) and we will ignore it now. This closure is actually a "closure builder": it is a closure that just returns another closure:

    // allocate operator-local storage.
    let mut queues = HashMap::new();
    let mut counts = HashMap::new();
    let mut buffer = Vec::new();

    move |input, output| {
        // coming soon!
    }

The closure that we end up returning is the |input, output| closure. It describes what the operator would do when presented with a handle to the input and a handle to the output. We've also named two hash maps and a vector we will need, and provided the move keyword to Rust so that it knows that the resulting closure owns these hash maps, rather than borrows them.

Inside the closure, we do two things: (i) read inputs and (ii) update counts and send outputs. Let's do the input reading first:

        // for each input batch, stash it at `time`.
        while let Some((time, data)) = input.next() {
            queues.entry(time.retain())
                  .or_insert(Vec::new())
                  .extend(data.replace(Vec::new()));
        }

The input handle has a next method, and it optionally returns a pair of time and data, representing a timely dataflow timestamp and a hunk of data bearing that timestamp, respectively. Our plan is to iterate through all available input (the next() method doesn't block, it just returns None when it runs out of data), accepting it from the timely dataflow system and moving it into our queue hash map.

Why do we do this? Because this is a streaming system, we could be getting data out of order. Our goal is to update the counts in time order, and to do this we'll need to enqueue what we get until we also get word that the associated time is complete. That happens in the next few hunks of code

First, we extract those times and their data that are ready to go:

        // enable each stashed time if ready.
        for (time, vals) in queues.iter_mut() {
            if !input.frontier().less_equal(time.time()) {
                let vals = std::mem::replace(vals, Vec::new());
                buffer.push((time.clone(), vals));
            }
        }

Here we look through each (time, vals) pair that we've queued up. We then check input.frontier, which is what tells us whether we might expect more times or not. The input.frontier() describes times we may yet see on the input; if it is less_equal to the time, then it is possible there might be more data.

If the time is complete, we extract the data and get ready to act on it. We don't actually act yet, because many times may become available at once, and we want to process them in order too. Before we do that, some housekeeping:

        // drop complete time and allocations.
        queues.retain(|time, vals| vals.len() > 0);

        // sort ready updates by time.
        buffer.sort_by(|x,y| (x.0).time().cmp(&(y.0).time()));

These calls clean up the queues hash map removing keys we are processing, and then sort buffer by time to make sure we process them in order. This first step is surprisingly important: the keys of this hash map are timestamps that can be used to send data, and we need to drop them for timely dataflow to understand that we give up the ability to send data at these times.

Finally, we drain buffer and process updates in time order

        // retire updates in time order.
        for (time, mut vals) in buffer.drain(..) {
            let mut session = output.session(&time);
            for (word, diff) in vals.drain(..) {
                let entry = counts.entry(word.clone()).or_insert(0i64);
                *entry += diff;
                session.give((word, *entry));
            }
        }

Here we process each time in order (we sorted them!). For each time, we create a new output session from output using time More importantly, this actually needs to be the same type as time from before; the system is smart and knows that if you drop all references to a time you cannot create new output sessions. It's a feature, not a bug.

We then proceed through each of the batches we enqueue, and through each of the (word, diff) pairs in each of the batches. I've decided that what we are going to do is update the count and announce the new count, but you could probably imagine doing lots of different things here.

The finished product

You can check out the result in examples/wordcount.rs. If you run it as written, you'll see output that looks like:

    Echidnatron% cargo run --example wordcount
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/wordcount`
    seen: ("round", 1)
    seen: ("round", 2)
    seen: ("round", 3)
    seen: ("round", 4)
    seen: ("round", 5)
    seen: ("round", 6)
    seen: ("round", 7)
    seen: ("round", 8)
    seen: ("round", 9)
    seen: ("round", 10)
    Echidnatron%

We kept sending the same word over and over, so its count went up. Neat. If you'd like to run it with two workers, you just need to put -- -w2 at the end of the command, like so:

    Echidnatron% cargo run --example wordcount -- -w2
        Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
        Running `target/debug/examples/wordcount -w2`
    seen: ("round", 1)
    seen: ("round", 2)
    ...
    seen: ("round", 19)
    seen: ("round", 20)
    Echidnatron%

Because there are two workers, each inputting "round" repeatedly, we count up to twenty. By the end of this text you should be able to produce more interesting examples, for example reading the contents of directories and divvying up responsibility for the files between the workers.

Running Timely Dataflows

In this section we will look at driving a timely dataflow computation.

With a dataflow graph defined, how do we interactively supply data to the computation, and how do we understand what the computation has actually done given that we are not ourselves doing it? These are good questions, and the dataflow execution model is indeed a bit of a departure from how most folks first experience programming.

The first thing to understand about timely dataflow is that we are programming the worker threads. Part of this program is asking the worker to build up a dataflow graph; we did that when we created an InputHandle and when we called dataflow followed by some filter and map and probe commands. But another part is where we actually start feeding the dataflow graph, advancing the inputs, and letting the worker give each of the operators a chance to run.

for round in 0..10 {
    input.send(round);
    input.advance_to(round + 1);
    while probe.less_than(input.time()) {
        worker.step();
    }
}

This is the loop that we've seen in several examples. It looks fairly simple, but this is what actually causes work to happen. We do send data and advance the input, but we also call worker.step(), and this is where the actual timely dataflow computation happens. Until you call this, all the data are just building up in queues.

In this section, we'll look at these moving parts in more detail.

Providing Input

The first thing we often see is input.send with some data. This moves the supplied data from the current scope into a queue shared with the input dataflow operator. As this queue starts to fill, perhaps due to you calling send a lot, it moves the data along to its intended recipients. This probably means input queues of other operators, but it may mean serialization and network transmission.

You can call send as much as you like, and the InputHandle implementation will keep moving the data along. The worst that is going to happen is depositing data in shared queues and perhaps some serialization; the call to send will not block, and it should not capture your thread of execution to do any particularly expensive work.

However, since we are writing the worker code, you may want to take a break from send every now and again and let some of the operators run (in just a moment). Part of efficient streaming computation is keeping the data moving, and building up only relatively small buffers before giving the operators a chance to run.

Controlling capabilities

The second thing we often see is input.advance_to with a time. This is an exciting moment where the input announces that it will no longer send data timestamped with anything not greater or equal to its argument. This is big news for the rest of the system, as any operator waiting on the timestamp you previously held can now get to work (or, once all the messages you sent have drained, it can get to work).

It is a logic error to call advance_to with a time that is not greater or equal to the current time, which you can read out with input.time. Timely will check this for you and panic if you screw it up. It is a bit like accessing an array out of bounds: you can check ahead of time if you are about to screw up, but you went and did it anyhow.

Finally, you might be interested to call input.close. This method consumes the input and thereby prevents you from sending any more data. This information is very exciting to the system, which can now tell dataflow operators that they won't be hearing much of anything from you any more.

TIP: It is very important to keep moving your inputs along if you want your dataflow graph to make progress. One of the most common classes of errors is forgetting to advance an InputHandle, and then waiting and waiting and waiting for the cumulative count of records (or whatever) to come out the other end. Timely really wants you to participate and be clear about what you will and will not do in the future.

At the same time, timely's progress tracking does work proportional to the number of timestamps you introduce. If you use a new timestamp for every record, timely will flush its buffers a lot, get very angry with you, and probably fall over. To the extent that you can batch inputs, sending many with the same timestamp, the better.

Monitoring Probes

On the flip side of inputs we have probes. Probes aren't outputs per se, but rather ways for you to monitor progress. They report on the possible timestamps remaining at certain places in the dataflow graph (wherever you attach them).

The easiest way to create a ProbeHandle is by calling .probe() on a stream. This attaches a probe to a point in the dataflow, and when you inspect the probe (in just a moment) you'll learn about what might happen at that point.

You can also create a ProbeHandle directly with ProbeHandle::new(). Such a probe handle is not very interesting yet, but you can attach a probe handle by calling .probe_with(&mut handle) on a stream. This has the cute benefit that you can attach one probe to multiple parts of the dataflow graph, and it will report on the union of their times. If you would like to watch multiple outputs, you could call .probe() multiple times, or attach one common handle to each with multiple calls to .probe_with(). Both are reasonable, depending on whether you need to distinguish between the multiple locations.

A probe handle monitors information that timely provides about the availability of timestamps. You can think of it as holding on to a Vec<Time>, where any possible future time must be greater or equal to one of the elements in the list.

There are a few convenience methods provided, which allow helpful access to the state a probe handle wraps:

  1. The less_than(&Time) method returns true if a time strictly less than the argument is possible.
  2. The less_equal(&Time) method returns true if a time less or equal to the argument is possible.
  3. The done() method returns true if no times are possible.

Probe handles also have a with_frontier method that allows you to provide a closure that can observe the frontier and return arbitrary results. This is a bit of a song and dance, because the frontier is shared mutable state and cannot be trivially handed back up to your code without peril (you would gain a RefMut that may cause the system to panic if you do not drop before calling worker.step()).

The most common thing to do with a probe handle is to check whether we are "caught up" to the input times. The following is a very safe idiom for doing this:

probe.less_than(input.time())

This checks if there are any times strictly less than what the input is positioned to provide next. If so, it means we could keep doing work and making progress, because we know that the system could catch up to input.time() as we can't produce anything less than this from input.

However, you are free to use whatever logic you like. Perhaps you just want to check this test a few times, rather than iterating for as long as it is true (which we commonly do). This would give the dataflow a chance to catch up, but it would start work on the next batch of data anyhow, to keep things moving along. There is a trade-off between overloading the system (if you provide data faster than you can retire it) and underloading it by constantly waiting rather than working.

Operator Execution

Perhaps the most important statement in a timely dataflow program:

worker.step()

This is the method that tells the worker that it is now a good time to schedule each of the operators. If you recall, when designing our dataflow we wrote these operators, each of which were programmed by what they would do when shown their input and output handles. This is where we run that code.

The worker.step() call is the heart of data processing in timely dataflow. The system will do a swing through each dataflow operator and call in to its closure once. Each operator has the opportunity to drain its input and produce some output, and depending on how they are coded they may do just that.

Importantly, this is also where we start moving data around. Until we call worker.step() all data are just sitting in queues. The parts of our computation that do clever things like filtering down the data, or projecting out just a few small fields, or pre-aggregating the data before we act on it, these all happen here and don't happen until we call this.

Make sure to call worker.step() now and again, like you would your parents.

Extending Dataflows

This might be surprising to see in the "Running Timely Dataflows" section, but it is worth pointing out. Just because we built one dataflow doesn't mean that we have to stop there. We can run one dataflow for a while, and then create a second dataflow and run it. While we are running those (just one worker.step() calls into both) we could create a third and start running that too.

The worker can track an arbitrary number of dataflows, and will clean up after each of them once it is complete (when no capabilities exist in the dataflow). You are welcome to spin up as many as you like, if there are reasons you might need several in the course of one program.

You can also do something fun that we're working on (in progress), which is to map in shared libraries and load the dataflows they define. This gives rise to something like a "timely dataflow cluster" that can accept jobs (in the form of shared libraries) which are installed and run sharing the resources with other dataflows. Of course, if they crash they take down everything, so bear this in mind before getting too excited.

Advanced Timely Dataflow

In this section we will cover some of the more advanced topics in timely dataflow. Most of these topics derive from existing concepts, but build up the complexity a level.

Much of what we'll learn about here looks like control flow: dataflow patterns that cause our computation to execute in interesting and new ways, other than a straight line of dataflow. These patterns connect back to idioms from sequential imperative data processing, but using progress information to resculpt the movement of data, changing what computation actually occurs.

Scopes

The first bit of complexity we will introduce is the timely dataflow scope. A scope is a region of dataflow, much like how curly braces provide scopes in imperative languages:

#![allow(unused)]
fn main() {
{
    // this is an outer scope

    {
        // this is an inner scope
    }
}
}

You can create a new scope in any other scope by invoking the scoped method:

extern crate timely;

use timely::dataflow::Scope;

fn main() {
    timely::example(|scope| {

        // Create a new scope with the same (u64) timestamp.
        scope.scoped::<u64,_,_>("SubScope", |subscope| {
            // probably want something here
        })

    });
}

The main thing that a scope does for you is allow you to enrich the timestamp of all streams that are brought into it. For example, perhaps the timestamp type in your base dataflow scope is usize, your nested scope could augment this to (usize, u32), or whatever additional timestamp type you might want. In the example above, we choose to keep the same timestamp, u64.

Why are additional timestamps useful? They allow operators in the nested scope to change their behavior based on the enriched timestamp, without worrying the streams and operators outside the scope about this detail. We will soon see the example of iterative dataflow, where a scope allows cycles within it, but does not bother operators outside the scope with that detail.

Entering and exiting scopes

In addition to creating scopes, we will also need to get streams of data into and out of scopes.

There are two simple methods, enter and leave, that allow streams of data into and out of scopes. It is important that you use them! If you try to use a stream in a nested scope, Rust will be confused because it can't get the timestamps of your streams to typecheck.

extern crate timely;

use timely::dataflow::Scope;
use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {

        let stream = (0 .. 10).to_stream(scope);

        // Create a new scope with the same (u64) timestamp.
        let result = scope.scoped::<u64,_,_>("SubScope", |subscope| {
            stream.enter(subscope)
                  .inspect_batch(|t, xs| println!("{:?}, {:?}", t, xs))
                  .leave()
        });

    });
}

Notice how we can both enter a stream and leave in a single sequence of operations.

The enter operator introduces each batch of records as a batch with an enriched timestamp, which usually means "the same" or "with a new zero coordinate". The leave just de-enriches the timestamp, correspondingly "the same" or "without that new coordinate". The leave operator results in a stream fit for consumption in the containing scope.

Regions

There is a handy shortcut for scopes that do not change the timestamp type, a region.

extern crate timely;

use timely::dataflow::Scope;
use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {

        let stream = (0 .. 10).to_stream(scope);

        // Create a new scope with the same (u64) timestamp.
        let result = scope.region(|subscope| {
            stream.enter(subscope)
                  .inspect_batch(|t, xs| println!("{:?}, {:?}", t, xs))
                  .leave()
        });

    });
}

Regions are mostly here to help you organize your dataflow. A region presents itself to its surrounding dataflow as a single operator, and that can make it easier for you or others to reason about the logical structure of your dataflow.

IMPORTANT Although you can probably write a program that uses region() but then skips the calls to enter and leave, because the timestamp types are the same, this is very naughty and please do not do this.

Iteration

Iteration is a particularly neat form of nested scope in which all timestamps are enriched with an iteration counter. This counter starts at zero for streams that enter the scope, may increment in the context of the loop (next section gives examples of this), and is removed when the leave method is called.

The enriched timestamp type is timely::order::Product<TOuter, TInner>, for an outer timestamp type TOuter and an iteration counter type TInner. However, there is a convenience method that can allow you to skip thinking too hard about this.

extern crate timely;

use timely::dataflow::Scope;
use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {

        let stream = (0 .. 10).to_stream(scope);

        // Create a new scope with a (u64, u32) timestamp.
        let result = scope.iterative::<u32,_,_>(|subscope| {
            stream.enter(subscope)
                  .inspect_batch(|t, xs| println!("{:?}, {:?}", t, xs))
                  .leave()
        });

    });
}

Iteration

One of the neatest things you can do in timely dataflow is iteration. It is a consequence of how timely dataflow tracks progress that we can build loops, circulate data within the loop, and still allow downstream operators to properly judge the progress of their input streams.

There is nothing special about iterative scopes. They are just timely dataflow scopes. In fact, we could even introduce cycles into the root scope that all dataflow calls provide.

What timely dataflow provides is a special stream called a LoopVariable. This is a stream whose contents are not yet defined. You can start using the stream as if they were, but they won't be until later on in the dataflow when we define them.

That may be a bit abstract, so let's look at a simple example.

We are going to check the Collatz conjecture, which says that if you repeatedly divide even numbers by two, and multiply odd numbers by three and add one, you eventually reach the number one. We could do this in lots of ways, but this is the timely dataflow way to do it.

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {

        // create a loop that cycles unboundedly.
        let (handle, stream) = scope.feedback(1);

        // circulate numbers, Collatz stepping each time.
        (1 .. 10)
            .to_stream(scope)
            .concat(&stream)
            .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
            .inspect(|x| println!("{:?}", x))
            .filter(|x| *x != 1)
            .branch_when(|t| t < &100).1
            .connect_loop(handle);
    });
}

This program first creates a loop variable, using the feedback method on scopes. This method comes from the Feedback extension trait in dataflow::operators, in case you can't find it. When we create a new loop variable, we have to tell timely dataflow by how much we should increment the timestamp each time around the loop. To be more specific, we have to give a path summary which often is just a number that tells us by how much to increment the timestamp. When we later connect the output of an operation back to this loop variable we can specify an upper bound on the number of iterations by using the branch_when method.

We start with a stream of the numbers from one through nine, because we have to start somewhere. Our plan is to repeatedly apply the Collatz step, and then discard any numbers equal to one, but we want to apply this not only to our input but also to whatever comes back around our loop variable. So, the very first step is to concat our input stream with the feedback stream. Then we can apply the Collatz step, filter out the ones, and then connect the resulting stream as the definition of the feedback stream.

We've built an upper limit of 100 in so that we don't spin out of control, in case the conjecture is false. It turns out that 9 will take 19 steps to converge, so this should be good enough. You could try it out for larger numbers!

Mutual Recursion

You can have as many loop variables as you want!

Perhaps you are a very clever person, and you've realized that we don't need to test the results of odd numbers to see if they are one, which would be easy if we broke our stream apart into streams of even and odd numbers. We can do that, by having two loop variables, one for even (0) and one for odd (1).

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {

        // create a loop that cycles unboundedly.
        let (handle0, stream0) = scope.feedback(1);
        let (handle1, stream1) = scope.feedback(1);

        // do the right steps for even and odd numbers, respectively.
        let results0 = stream0.map(|x| x / 2).filter(|x| *x != 1);
        let results1 = stream1.map(|x| 3 * x + 1);

        // partition the input and feedback streams by even-ness.
        let parts =
            (1 .. 10)
                .to_stream(scope)
                .concat(&results0)
                .concat(&results1)
                .inspect(|x| println!("{:?}", x))
                .partition(2, |x| (x % 2, x));

        // connect each part appropriately.
        parts[0].connect_loop(handle0);
        parts[1].connect_loop(handle1);
    });
}

This is a different way to do the same computation (similar; because we've moved input from the head of the dataflow, the limit of 100 is effectively reduced by one). I won't say it is a better way to do the same computation, but it is different.

Scopes

Of course, you can do all of this in a nested scope, if that is appropriate. In the example above, we just used the ambient u64 timestamp from timely::example, but perhaps you were hoping that it would correspond to streamed input, or something else. We just need to introduce a new nested scope, in that case.

extern crate timely;

use timely::dataflow::operators::*;
use timely::dataflow::Scope;

fn main() {
    timely::example(|scope| {

        let input = (1 .. 10).to_stream(scope);

        // Create a nested iterative scope.
        // Rust needs help understanding the iteration counter type.
        scope.iterative::<u64,_,_>(|subscope| {

            let (handle, stream) = subscope.loop_variable(1);

            input
                .enter(subscope)
                .concat(&stream)
                .map(|x| if x % 2 == 0 { x / 2 } else { 3 * x + 1 } )
                .inspect(|x| println!("{:?}", x))
                .filter(|x| *x != 1)
                .connect_loop(handle);
        })

    });
}

We could also pop out the results with leave if we had any.

Exercise: Rewrite the above example so that while iterating each tuple tracks (i) its progenitor number, and (ii) the number of times it has gone around the loop, and rather than discarding tuples whose value is now one they leave the loop and report for each input number how many steps it takes to reach one.

Flow Control

IN PROGRESS

Data flow along dataflow graphs. It is what they do. It says it in the name. But sometimes we want to control how the data in the dataflow flow. Not where the data flow, we already have controls for doing that (exchange, partition), but rather when the data flow.

Let's consider a simple example, where we take an input stream of numbers, and produce all numbers less than each input.

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {

        // Produce all numbers less than each input number.
        (1 .. 10)
            .to_stream(scope)
            .flat_map(|x| (0 .. x));

    });
}

Each number that you put into this dataflow (those 0 .. 10 folks) can produce a larger number of output records. Let's say you put 1000 .. 1010 as input instead? We'd have ten thousand records flowing around. What about 1000 .. 2000?

This dataflow can greatly increase the amount of data moving around. We might have thought "let's process just 1000 records", but this turned into one million records. Perhaps we have a few of these operators in a row (don't ask; it happens), we can pretty quickly overwhelm the system if we aren't careful.

In most systems this is mitigated by flow control, mechanisms that push back when it seems like operators are producing more data than can be consumed in the same amount of time.

Timely dataflow doesn't have a built in notion of flow control. Sometimes you want it, sometimes you don't, so we didn't make you have it. Also, it is hard to get right, for similar reasons. Instead, timely dataflow scopes can be used for application-level flow control.

Let's take a simple example, where we have a stream of timestamped numbers coming at us, performing the flat_map up above. Our goal is to process all of the data, but to do so in a controlled manner where we never overwhelm the computation. For example, we might want to do approximately this:

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {

        // Produce all numbers less than each input number.
        (1 .. 100_000)
            .to_stream(scope)
            .flat_map(|x| (0 .. x));

    });
}

but without actually producing the 4,999,950,000 intermediate records all at once.

One way to do this is to build a self-regulating dataflow, into which we can immediately dump all the records, but which will buffer records until it is certain that the work for prior records has drained. We will write this out in all the gory details, but these operators are certainly things that could be packaged up and reused.

The idea here is to take our stream of work, and to use the delay operator to assign new timestamps to the records. We will spread the work out so that each timestamp has at most (in this case) 100 numbers. We can write a binary operator that will buffer received records until their timestamp is "next", meaning all strictly prior work has drained from the dataflow fragment. How do we do this? We turn our previously unary operator into a binary operator that has a feedback edge connected to its second input. We use the frontier of that feedback input to control when we emit data.

extern crate timely;

use timely::dataflow::operators::*;
use timely::dataflow::channels::pact::Pipeline;

fn main() {
    timely::example(|scope| {

        let mut stash = ::std::collections::HashMap::new();

        // Feedback loop for noticing progress.
        let (handle, cycle) = scope.feedback(1);

        // Produce all numbers less than each input number.
        (1 .. 100_000u64)
            .to_stream(scope)
            // Assign timestamps to records so that not much work is in each time.
            .delay(|number, time| number / 100 )
            // Buffer records until all prior timestamps have completed.
            .binary_frontier(&cycle, Pipeline, Pipeline, "Buffer", move |capability, info| {

                let mut vector = Vec::new();

                move |input1, input2, output| {

                    // Stash received data.
                    input1.for_each(|time, data| {
                        data.swap(&mut vector);
                        stash.entry(time.retain())
                             .or_insert(Vec::new())
                             .extend(vector.drain(..));
                    });

                    // Consider sending stashed data.
                    for (time, data) in stash.iter_mut() {
                        // Only send data once the probe is not less than the time.
                        // That is, once we have finished all strictly prior work.
                        if !input2.frontier().less_than(time.time()) {
                            output.session(&time).give_iterator(data.drain(..));
                        }
                    }

                    // discard used capabilities.
                    stash.retain(|_time, data| !data.is_empty());
                }
            })
            .flat_map(|x| (0 .. x))
            // Discard data and connect back as an input.
            .filter(|_| false)
            .connect_loop(handle);
    });
}

This version of the code tops out at about 64MB on my laptop and takes 45 seconds, whereas the version with unary commented out heads north of 32GB before closing out after two minutes.

Capture and Replay

Timely dataflow has two fairly handy operators, capture_into and replay_into, that are great for transporting a timely dataflow stream from its native representation into data, and then back again. They are also a fine way to think about interoperating with other systems for streaming data.

Capturing Streams

At its core, capture_into records everything it sees about the stream it is attached to. If some data arrive, it records that. If there is a change in the possibility that timestamps might arrive on its input, it records that.

The capture_into method is relative simple, and we can just look at it:

    fn capture_into<P: EventPusher<S::Timestamp, D>+'static>(&self, event_pusher: P) {

        let mut builder = OperatorBuilder::new("Capture".to_owned(), self.scope());
        let mut input = PullCounter::new(builder.new_input(self, Pipeline));
        let mut started = false;

        let event_pusher1 = Rc::new(RefCell::new(event_pusher));
        let event_pusher2 = event_pusher1.clone();

        builder.build(
            move |frontier| {
                if !started {
                    frontier[0].update(Default::default(), -1);
                    started = true;
                }
                if !frontier[0].is_empty() {
                    let to_send = ::std::mem::replace(&mut frontier[0], ChangeBatch::new());
                    event_pusher1.borrow_mut().push(Event::Progress(to_send.into_inner()));
                }
            },
            move |consumed, _internal, _external| {
                let mut borrow = event_pusher2.borrow_mut();
                while let Some((time, data)) = input.next() {
                    borrow.push(Event::Messages(time.clone(), data.deref_mut().clone()));
                }
                input.consumed().borrow_mut().drain_into(&mut consumed[0]);
                false
            }
        );
    }

The method is generic with respect to some implementor P of the trait EventPusher which defines a method push that accepts Event<T, D> items (we will see a few implementations in just a moment). After a bit of set-up, capture_into builds a new operator with one input and zero outputs, and sets the logic for (i) what to do when the input frontier changes, and (ii) what to do when presented with the opportunity to do a bit of computation. In both cases, we just create new events based on what we see (progress changes and data messages, respectively).

There is a mysterious subtraction of Default::default(), which has to do with the contract that the replaying operators assume the stream starts with such a capability. This prevents the need for the replayers to block on the stream in their operator construction (any operator must state any initial capabilities as part of its construction; it cannot defer that until later).

One nice aspect of capture_into is that it really does reveal everything that an operator sees about a stream. If you got your hands on the resulting sequence of events, you would be able to review the full history of the stream. In principle, this could be a fine place to persist the data, capturing both data and progress information.

Replaying Streams

At its core, replay_into takes some sequence of Event<T, D> items and reproduces the stream, as it was recorded. It is also fairly simple, and we can just look at its implementation as well:

    fn replay_into<S: Scope<Timestamp=T>>(self, scope: &mut S) -> Stream<S, D>{

        let mut builder = OperatorBuilder::new("Replay".to_owned(), scope.clone());
        let (targets, stream) = builder.new_output();

        let mut output = PushBuffer::new(PushCounter::new(targets));
        let mut event_streams = self.into_iter().collect::<Vec<_>>();
        let mut started = false;

        builder.build(
            move |_frontier| { },
            move |_consumed, internal, produced| {

                if !started {
                    internal[0].update(Default::default(), (event_streams.len() as i64) - 1);
                    started = true;
                }

                for event_stream in event_streams.iter_mut() {
                    while let Some(event) = event_stream.next() {
                        match *event {
                            Event::Start => { },
                            Event::Progress(ref vec) => {
                                internal[0].extend(vec.iter().cloned());
                            },
                            Event::Messages(ref time, ref data) => {
                                output.session(time).give_iterator(data.iter().cloned());
                            }
                        }
                    }
                }

                output.cease();
                output.inner().produced().borrow_mut().drain_into(&mut produced[0]);

                false
            }
        );

        stream
   }

The type of self here is actually something that allows us to enumerate a sequence of event streams, so each replayer is actually replaying some variable number of streams. As part of this, our very first action is to amend our initial Default::default() capability to have multiplicity equal to the number of streams we are replaying:

                if !started {
                    internal[0].update(Default::default(), (event_streams.len() as i64) - 1);
                    started = true;
                }

If we have multiple streams, we'll now have multiple capabilities. If we have no stream, we will just drop the capability. This change is important because each source stream believes it has such a capability, and we will eventually see this many drops of the capability in the event stream (though perhaps not immediately; the initial deletion we inserted in capture_into likely cancels with the initial capabilities expressed by the outside world; we will likely need to wait until the captured stream is informed about the completion of messages with the default time).

Having done the initial adjustment, we literally just play out the streams (note the plural) as they are available. The next method is expected not to block, but rather to return None when there are no more events currently available. It is a bit of a head-scratcher, but any interleaving of these streams is itself a valid stream (messages are sent and capabilities claimed only when we hold appropriate capabilities).

An Example

We can check out the examples examples/capture_send.rs and examples/capture_recv.rs to see a paired use of capture and receive demonstrating the generality.

The capture_send example creates a new TCP connection for each worker, which it wraps and uses as an EventPusher. Timely dataflow takes care of all the serialization and stuff like that (warning: it uses abomonation, so this is not great for long-term storage).

extern crate timely;

use std::net::TcpStream;
use timely::dataflow::operators::ToStream;
use timely::dataflow::operators::capture::{Capture, EventWriter};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {

        let addr = format!("127.0.0.1:{}", 8000 + worker.index());
        let send = TcpStream::connect(addr).unwrap();

        worker.dataflow::<u64,_,_>(|scope|
            (0..10u64)
                .to_stream(scope)
                .capture_into(EventWriter::new(send))
        );
    }).unwrap();
}

The capture_recv example is more complicated, because we may have a different number of workers replaying the stream than initially captured it.

extern crate timely;

use std::net::TcpListener;
use timely::dataflow::operators::Inspect;
use timely::dataflow::operators::capture::{EventReader, Replay};

fn main() {
    timely::execute_from_args(std::env::args(), |worker| {

        let source_peers = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();

        // create replayers from disjoint partition of source worker identifiers.
        let replayers =
        (0 .. source_peers)
            .filter(|i| i % worker.peers() == worker.index())
            .map(|i| TcpListener::bind(format!("127.0.0.1:{}", 8000 + i)).unwrap())
            .collect::<Vec<_>>()
            .into_iter()
            .map(|l| l.incoming().next().unwrap().unwrap())
            .map(|r| EventReader::<_,u64,_>::new(r))
            .collect::<Vec<_>>();

        worker.dataflow::<u64,_,_>(|scope| {
            replayers
                .replay_into(scope)
                .inspect(|x| println!("replayed: {:?}", x));
        })
    }).unwrap(); // asserts error-free execution
}

Almost all of the code up above is assigning responsibility for the replaying between the workers we have (from worker.peers()). We partition responsibility for 0 .. source_peers among the workers, create TcpListeners to handle the connection requests, wrap them in EventReaders, and then collect them up as a vector. The workers have collectively partitioned the incoming captured streams between themselves.

Finally, each worker just uses the list of EventReaders as the argument to replay_into, and we get the stream magically transported into a new dataflow, in a different process, with a potentially different number of workers.

If you want to try it out, make sure to start up the capture_recv example first (otherwise the connections will be refused for capture_send) and specify the expected number of source workers, modifying the number of received workers if you like. Here we are expecting five source workers, and distributing them among three receive workers (to make life complicated):

    shell1% cargo run --example capture_recv -- 5 -w3

Nothing happens yet, so head over to another shell and run capture_send with the specified number of workers (five, in this case):

    shell2% cargo run --example capture_send -- -w5

Now, back in your other shell you should see something like

    shell1% cargo run --example capture_recv -- 5 -w3
    replayed: 0
    replayed: 1
    replayed: 2
    replayed: 3
    replayed: 4
    replayed: 5
    replayed: 0
    replayed: 6
    replayed: 1
    ...

which just goes on and on, but which should produce 50 lines of text, with five copies of 0 .. 10 interleaved variously.

Capture types

There are several sorts of things you could capture into and replay from. In the capture::events module you will find two examples, a linked list and a binary serializer / deserializer (wrapper around Write and Read traits). The binary serializer is fairly general; we used it up above to wrap TCP streams. You could also write to files, or write to shared memory. However, be mindful that the serialization format (abomonation) is essentially the in-memory representation, and Rust makes no guarantees about the stability of such a representation across builds.

There is also an in-progress Kafka adapter available in the repository, which uses Kafka topics to store the binary representation of captured streams, which can then be replayed by any timely computation that can read them. This may be a while before it is sorted out, because Kafka seems to have a few quirks, but if you would like to help get in touch.

Custom Datatypes

WORK IN PROGRESS

Timely dataflow allows you to use a variety of Rust types, but you may also find that you need (or would prefer) your own struct and enum types.

Timely dataflow provides two traits, Data and ExchangeData for types that timely dataflow can transport within a worker thread and across threads.

The Data trait

The Data trait is essentially a synonym for Clone+'static, meaning the type must be cloneable and cannot contain any references with other than a static lifetime. Most types implement these traits automatically, but if yours do not you should decorate your struct definition with a derivation of the Clone trait:

#[derive(Clone)]
struct YourStruct { .. }

The ExchangeData trait

The ExchangeData trait is more complicated, and is established in the communication/ module. There are two options for this trait, which are determined by whether you use the --bincode feature at compilation, or not.

  • If you use --bincode then the trait is a synonym for

    Send+Sync+Any+serde::Serialize+for<'a>serde::Deserialize<'a>+'static

    where serde is Rust's most popular serialization and deserialization crate. A great many types implement these traits. If your types does not, you should add these decorators to their definition:

    #[derive(Serialize, Deserialize)]

    You must include the serde crate, and if not on Rust 2018 the serde_derive crate.

    The downside to the --bincode flag is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.

  • If you do not use the --bincode feature, then the Serialize and Deserialize requirements are replaced by Abomonation, from the abomonation crate. This trait allows in-place deserialization, but is implemented for fewer types, and has the potential to be a bit scarier (due to in-place pointer correction).

    Your types likely do not implement Abomonation by default, but you can similarly use

    #[derive(Abomonation)]

    You must include the abomonation and abomonation_derive crate for this to work correctly.

An example

Let's imagine you would like to play around with a tree data structure as something you might send around in timely dataflow. I've written the following candidate example:

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .map(|x| TreeNode::new(x))
               .inspect(|x| println!("seen: {:?}", x));
    });
}

// A tree structure.
struct TreeNode<D> {
    data: D,
    children: Vec<TreeNode<D>>,
}

impl<D> TreeNode<D> {
    fn new(data: D) -> Self {
        Self { data, children: Vec::new() }
    }
}

This doesn't work. You'll probably get two errors, that TreeNode doesn't implement Clone, nor does it implement Debug. Timely data types need to implement Clone, and our attempt to print out the trees requires an implementation of Debug. We can create these implementations by decorating the struct declaration like so:

#![allow(unused)]
fn main() {
#[derive(Clone, Debug)]
struct TreeNode<D> {
    data: D,
    children: Vec<TreeNode<D>>,
}
}

This works for me! We see

    Echidnatron% cargo run --example types
       Compiling timely v0.8.0 (/Users/mcsherry/Projects/timely-dataflow)
        Finished dev [unoptimized + debuginfo] target(s) in 5.33s
         Running `target/debug/examples/types`
    seen: TreeNode { data: 0, children: [] }
    seen: TreeNode { data: 1, children: [] }
    seen: TreeNode { data: 2, children: [] }
    seen: TreeNode { data: 3, children: [] }
    seen: TreeNode { data: 4, children: [] }
    seen: TreeNode { data: 5, children: [] }
    seen: TreeNode { data: 6, children: [] }
    seen: TreeNode { data: 7, children: [] }
    seen: TreeNode { data: 8, children: [] }
    seen: TreeNode { data: 9, children: [] }
    Echidnatron%

Exchanging data

Let's up the level a bit and try and shuffle our tree data between workers.

If we replace our main method with this new one:

extern crate timely;

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .map(|x| TreeNode::new(x))
               .exchange(|x| x.data)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

#[derive(Clone, Debug)]
struct TreeNode<D> {
    data: D,
    children: Vec<TreeNode<D>>,
}

impl<D> TreeNode<D> {
    fn new(data: D) -> Self {
        Self { data, children: Vec::new() }
    }
}

We get a new error. A not especially helpful error. It says that it cannot find an exchange method, or more specifically that one exists but it doesn't apply to our type at hand. This is because the data need to satisfy the ExchangeData trait but do not. It would be better if this were clearer in the error messages, I agree.

We can fix the problem two ways. First, if you would like to use bincode, then we update the source like so:

#[macro_use]
extern crate serde_derive;
extern crate serde;

#[derive(Clone, Debug, Serialize, Deserialize)]
struct TreeNode<D> {
    data: D,
    children: Vec<TreeNode<D>>,
}

and make sure to include the serde_derive and serde crates. Now when we run things (notice the --features flag) we see:

    Echidnatron% cargo run --example types --features bincode
        Finished dev [unoptimized + debuginfo] target(s) in 0.07s
         Running `target/debug/examples/types`
    seen: TreeNode { data: 0, children: [] }
    seen: TreeNode { data: 1, children: [] }
    seen: TreeNode { data: 2, children: [] }
    seen: TreeNode { data: 3, children: [] }
    seen: TreeNode { data: 4, children: [] }
    seen: TreeNode { data: 5, children: [] }
    seen: TreeNode { data: 6, children: [] }
    seen: TreeNode { data: 7, children: [] }
    seen: TreeNode { data: 8, children: [] }
    seen: TreeNode { data: 9, children: [] }
    Echidnatron%

Great news!

Internals

Communication

Communication in timely dataflow starts from the timely_communication crate. This crate includes not only communication, but is actually where we start up the various worker threads and establish their identities. As in timely dataflow, everything starts by providing a per-worker closure, but this time we are given only a channel allocator as an argument.

Before continuing, I want to remind you that this is the internals section; you could write your code against this crate if you really want, but one of the nice features of timely dataflow is that you don't have to. You can use a nice higher level layer, as discussed previously in the document.

That being said, let's take a look at the example from the timely_communication documentation, which is not brief but shouldn't be wildly surprising either.

extern crate timely_communication;

fn main() {

    // extract the configuration from user-supplied arguments, initialize the computation.
    let config = timely_communication::Configuration::from_args(std::env::args()).unwrap();
    let guards = timely_communication::initialize(config, |mut allocator| {

        println!("worker {} of {} started", allocator.index(), allocator.peers());

        // allocates a pair of senders list and one receiver.
        let (mut senders, mut receiver) = allocator.allocate();

        // send typed data along each channel
        for i in 0 .. allocator.peers() {
            senders[i].send(format!("hello, {}", i));
            senders[i].done();
        }

        // no support for termination notification,
        // we have to count down ourselves.
        let mut received = 0;
        while received < allocator.peers() {
            if let Some(message) = receiver.recv() {
                println!("worker {}: received: <{}>", allocator.index(), message);
                received += 1;
            }
        }

        allocator.index()
    });

    // computation runs until guards are joined or dropped.
    if let Ok(guards) = guards {
        for guard in guards.join() {
            println!("result: {:?}", guard);
        }
    }
    else { println!("error in computation"); }
}

There are a few steps here, and we'll talk through the important parts in each of them.

Configuration

There is only a limited amount of configuration you can currently do in a timely dataflow computation, and it all lives in the initialize::Configuration type. This type is a simple enumeration of three ways a timely computation could run:

pub enum Configuration {
    Thread,
    Process(usize),
    Cluster(usize, usize, Vec<String>, bool)
}

The first variant Thread indicates that we will simply have one worker thread. This is a helpful thing to know because it means that all of our exchange channels can be dramatically simplified, just down to simple queues. The second variant Process corresponds to multiple worker threads within one process. The number indicates the parameters. The third variant Cluster is how we indicate that this process will participate in a larger clustered computation; we supply the number of threads, this process' identifier, a list of addresses of all participants, and a boolean for whether we would like some diagnostics about the established connections.

The configuration is important because it determines how we build the channel allocator allocator that we eventually provide to each worker: allocator will be responsible for building communication channels to other workers, and it will need to know where these other workers are.

Channel Allocators

The allocator reference bound by the worker closure is the only handle a worker has to the outside world (other than any values you move into the closure). It wraps up all the information we have about this workers place in the world, and provides the ability to assemble channels to the other workers.

There are a few implementations of the Allocate trait, which is defined as

pub trait Allocate {
    fn index(&self) -> usize;
    fn peers(&self) -> usize;
    fn allocate<T: Data>(&mut self) -> (Vec<Box<Push<T>>>, Box<Pull<T>>);
}

These methods are the only functionality provided by allocator. A worker can ask for its own index, which is a number from zero up to the number of total peer workers (including itself), which it can also ask for. Perhaps most importantly, the worker can also request the allocation of a typed channel, which is returned as a pair of (i) a list of Push endpoints into which it can send data, and (ii) a single Pull endpoint from which it can extract data. The list has length equal to the number of peers, and data sent into push endpoint i will eventually be received by the worker with index i, if it keeps pulling on its pull endpoint.

The channels are various and interesting, but should be smartly arranged. The channel from the worker back to itself is just a queue, the channels within the same process are Rust's inter-thread channels, and the channels between processes will automatically serialize and deserialize the type T for you (this is part of the T: Data requirement).

One crucial assumption made in this design is that the channels can be identified by their order of creation. If two workers start executing in different processes, allocating multiple channels, the only way we will know how to align these channels is by identifiers handed out as the channels are allocated. I strongly recommend against non-deterministic channel construction, or "optimizing out" some channels from some workers.

The Data Trait

The Data trait that we impose on all types that we exchange is a "marker trait": it wraps several constraints together, like so

pub trait Data : Send+Any+Serialize+Clone+'static { }
impl<T: Clone+Send+Any+Serialize+'static> Data for T { }

These traits are all Rust traits, except for Serialize, and they mostly just say that we can clone and send the data around. The Serialize trait is something we introduce, and asks for methods to get into and out of a sequence of bytes.

pub trait Serialize {
    fn into_bytes(&mut self, &mut Vec<u8>);
    fn from_bytes(&mut Vec<u8>) -> Self;
}

We have a blanket implementation of Serialize for any type that implements Abomonation. Ideally, you shouldn't have to worry about this, unless you are introducing a new type and need an Abomonation implementation or you are hoping to move some types containing fields that do not satisfy those Rust traits.

Push and Pull

The two traits Push and Pull are the heart of the communication underlying timely dataflow. They are very simple, but relatively subtle and interesting and perhaps even under-exploited.

Push

The Push trait looks like so (with two helper methods elided):

pub trait Push<T> {
    fn push(&mut self, element: &mut Option<T>);
}

That's all of it.

The push method takes a mutable reference to an option wrapped around a thing. This is your way of telling the communication layer that, (i) if the reference points to a thing, you'd really like to push it into the channel, and (ii) if the reference doesn't point to a thing this is the cue that you might walk away for a while. It is important to send a None if you would like to ensure that whatever you've pushed in the past should be guaranteed to get through without further work on your part.

Now, we didn't need a mutable reference to do that; we could have just had the argument type be Option<T>, or had two methods send and done (those are the elided helper methods).

This framing allows for fairly natural and stable zero-copy communication. When you want to send a buffer of records, you wrap it up as Some(buffer) and call push. Once push returns, the channel has probably taken your buffer, but it has the opportunity to leave something behind for you. This is a very easy way for the communication infrastructure to return resources to you. In fact, even if you have finished sending messages, it may make sense to repeatedly send mutable references to None for as long as the channel has memory to hand you.

Although not used by timely at the moment, this is also designed to support zero copy networking where the communication layer below (e.g. something like RDMA) operates more efficiently if it allocates the buffers for you (e.g. in dedicated memory pinned by the hardware). In this case, push is a great way to request resources from the channel. Similarly, it can serve as a decent back-channel to return owned resources for the underlying typed data (e.g., you pushed a list of String elements, and once used they could be returned to you to be reused).

Pull

The Pull trait is the dual to Push: it allows someone on the other end of a channel to request whatever the channel has in store next, also as a mutable reference to an option wrapped around the type.

pub trait Pull<T> {
    fn pull(&mut self) -> &mut Option<T>;
}

As before, the mutable reference and option allow the two participants to communicate about the availability of data, and to return resources if appropriate. For example, it is very natural after the call to pull to claim any T made available with a ::std::mem::swap which puts something else in its place (either Some(other) or None). If the puller has some data to return, perhaps data it received from wherever it was pushing data at, this is a great opportunity to move it back up the communication chain.

I'm not aware of a circumstance where you might be obliged to call pull and set the result to None to signal that you may stop calling Pull. It seems like it could be important, if these methods really are dual, but I don't see how just yet.

Guarded Computation

The call to initialize returns a

Result<WorkerGuards<T>,String>

which is Rust's approach to error handling: we either get some worker guards or a String explaining why things went wrong, perhaps because we weren't able to establish connections with all of the workers, or something like that. The WorkerGuards<T> is a list of thread join handles, ::std::thread::JoinHandle<T>, which is something that we can wait on and expect a T in return. Each of these handles allow us to wait on the local worker threads, and collect whatever they produce as output.

We've wrapped the handles up in a special type, WorkerGuards, because the default behavior otherwise should you just discard the result is for the threads to detach, which results in the main thread exiting and the workers just getting killed. This way, even if you ignore the result we will wait for the worker threads to complete. If you would like your main thread to exit and kill off the workers, you have other ways of doing this.

Progress Tracking

Progress tracking is a fundamental component of timely dataflow, and it is important to understand how it works to have a complete understanding of what timely dataflow does for you.

Let's start with a statement about what progress tracking means to accomplish.

The setting is that there are multiple workers, each of whom move data through a common dataflow graph. The data may move between workers, and as the data are processed by operators we have relatively few guarantees about their consequences: a worker may receive a record and do nothing, or it could send one thousand output records some of which are now destined for us. Nonetheless, we need to be able to make meaningful statements about the possibility of receiving more data.

Timely dataflow's approach is that data bear a logical timestamp, indicating some moment in the computation at which they should be thought to exist. This is not necessarily a physical timestamp, like the worker's clock when the record was created, but could be any type satisfying a few constraints. A common example is sequence numbers, counting up from zero.

Timely dataflow imposes a few constraints, we think they are natural, on the structure of the dataflow graph, from which it is able to make restrictive statements at each location in the dataflow graph of the form "you will only ever see timestamps greater or equal to these times". This provides each dataflow operator with an understanding of progress in the computation. Eventually, we may even learn that the set of future timestamps is empty, indicating completion of the stream of data.

Timely dataflow computations are structured so that to send a timestamped message, an operator must hold a capability for that timestamp. Timely dataflow's progress tracking can be viewed as (i) workers collectively maintaining a view of outstanding timestamp capabilities at each location in the dataflow graph, and (ii) each worker independently determines and communicates the implications of changes in its view of capabilities to other locations in its instance of the dataflow graph.

Before we get into these two aspects, we will first need to be able to name parts of our dataflow graph.

Dataflow Structure

A dataflow graph hosts some number of operators. For progress tracking, these operators are simply identified by their index. Each operator has some number of input ports, and some number of output ports. The dataflow operators are connected by connecting each input port to a single output port (typically of another operator). Each output port may be connected to multiple distinct input ports (a message produced at an output port is to be delivered to all attached input ports).

In timely dataflow progress tracking, we identify output ports by the type Source and input ports by the type Target, as from the progress coordinator's point of view, an operator's output port is a source of timestamped data, and an operator's input port is a target of timestamped data. Each source and target can be described by their operator index and then an operator-local index of the corresponding port. The use of distinct types helps us avoid mistaking input and output ports.

pub struct Source {
    /// Index of the source operator.
    pub index: usize,
    /// Number of the output port from the operator.
    pub port: usize,
}

pub struct Target {
    /// Index of the target operator.
    pub index: usize,
    /// Number of the input port to the operator.
    pub port: usize,
}

The structure of the dataflow graph can be described by a list of all of the connections in the graph, a Vec<(Source, Target)>. From this, we could infer the number of operators and their numbers of input and output ports, as well as enumerate all of the connections themselves.

At this point we have the structure of a dataflow graph. We can draw a circle for each operator, a stub for each input and output port, and edges connecting the output ports to their destination input ports. Importantly, we have names for every location in the dataflow graph, which will either be a Source or a Target.

Maintaining Capabilities

Our first goal is for the workers to collectively track the number of outstanding timestamp capabilities in the system, for each timestamp and at each location, as dataflow operators run and messages are sent and received. Capabilities can exist in two places in timely dataflow: an operator can explicitly hold capabilities to send timestamped messages on each of its outputs, and each timestamped message bears a capability for its timestamp.

When tracking capabilities, we will track their multiplicity: how many capabilities for time t are there at location l? For most locations and times this number will be zero. Unless the computation has completed, for some locations and times this number must be positive. Numbers can also be transiently negative, as reports of changes may arrive out of order.

When a timely dataflow computation starts, there are no messages in flight. Rather, each operator starts with the capabilities to send any timestamped message on any of its outputs. As this is common knowledge among the workers, each initializes its counts with #workers for capabilities at each operator output. Each worker understands that it will need to hear #workers reports of such capabilities being dropped before they are actually out of the system.

As a computation proceeds, operators may perform three classes of action:

  1. They may consume input messages, acquiring the associated capability.
  2. They may clone, downgrade, or drop any capability they hold.
  3. They may send output messages at any timestamp for which they hold the capability.

The results of these actions are a stream of changes to the occurrences of capabilities at each location in the dataflow graph. As input messages are consumed, capabilities located at the corresponding Target input port are decremented, and capabilities at the operators Source output ports are incremented. Cloning, downgrading, and dropping capabilities changes the counts at each of the operators corresponding Source output ports. Sending messages results in increments to the counts at the Target ports of each Target connected to the Source from which the message is sent.

Concretely, a batch of changes has the form (Vec<(Source, Time, i64)>, Vec<(Target, Time, i64)>), indicating the increments and decrements for each time, at each dataflow location. Importantly we must keep these batches intact; the safety of the progress tracking protocol relies on not communicating half-formed progress messages (for example, consuming an input message but forgetting to indicate the acquisition of its capability).

Each worker broadcasts the stream of progress change batches to all workers in the system (including itself) along point-to-point FIFO channels. At any point in time, each worker has seen an arbitrary prefix of the sequence of progress change batches produced by each other worker, and it is expected that as time proceeds each worker eventually sees every prefix of each sequence.

At any point in time, each worker's view of the capabilities in the system is defined by the accumulation of all received progress change batches, plus initial capabilities at each output (with starting multiplicity equal to the number of workers). This view may be surprising and messy (there may be negative counts), but at all times it satisfies an important safety condition, related to the communicated implications of these capabilities, which we now develop.

Communicating Implications

Each worker maintains an accumulation of progress update batches, which explains where capabilities may exist in the dataflow graph. This information is useful, but it is not yet sufficient to make strong statements about the possibility of timestamped messages arriving at other locations in the dataflow graph. Even though a capability for timestamp t may exist, this does not mean that the time may arrive at any location in the dataflow graph. To be precise, we must discuss the paths through the dataflow graph which the timestamp capability could follow.

Path Summaries

Progress tracking occurs in the context of a dataflow graph of operators all with a common timestamp type T: Timestamp. The Timestamp trait requires the PartialOrder trait, meaning two timestamps may be ordered but need not be. Each type implementing Timestamp must also specify an associated type Summary implementing PathSummary<Self>.

pub trait Timestamp: PartialOrder {
    type Summary : PathSummary<Self>;
}

A path summary is informally meant to summarize what must happen to a timestamp as it travels along a path in a timely dataflow. Most paths that you might draw have trivial summaries ("no change guaranteed"), but some paths do force changes on timestamps. For example, a path that goes from the end of a loop back to its head must increment the loop counter coordinate of any timestamp capability passed along it.

pub trait PathSummary<T> : PartialOrder {
    fn results_in(&self, src: &T) -> Option<T>;
    fn followed_by(&self, other: &Self) -> Option<Self>;
}

The types implementing PathSummary must be partially ordered, and implement two methods:

  1. The results_in method explains what must happen to a timestamp moving along a path. Note the possibility of None; a timestamp could not move along a path. For example, a path summary path could increment a timestamp by one, for which
path.results_in(&4) == Some(5);
path.results_in(&u64::max_value()) == None;

It is important that results_in only advance timestamps: for all path summaries p we require that

if let Some(time) = p.results_in(&x) {
    assert!(x.less_equal(&time));
}
  1. The followed_by method explains how two path summaries combine. When we build the summaries we will start with paths corresponding to single edges, and build out more complex paths by combining the effects of multiple paths (and their summaries). As with results_in, it may be that two paths combined result in something that will never pass a timestamp, and the summary may be None (for example, two paths that each increment a loop counter by half of its maximum value).

Two path summaries are ordered if for all timestamps the two results of the path summaries applied to the timestamp are also ordered.

Path summaries are only partially ordered, and when summarizing what must happen to a timestamp when going from one location to another, along one of many paths, we will quickly find ourselves speaking about collections of path summaries. There may be several summaries corresponding to different paths we might take. We can discard summaries from this collection that are strictly greater than other elements of the collection, but we may still have multiple incomparable path summaries.

The second part of progress tracking, communicating the implications of current capabilities, is fundamentally about determining and locking in the minimal collections of path summaries between any two locations (Source or Target) in the dataflow graph. This is the "compiled" representation of the dataflow graph, from which we can derive statements about the possibility of messages at one point in the graph leading to messages at another point.

Operator summaries

Where do path summaries come from?

Each operator in timely dataflow must implement the Operate trait. Among other things, that we will get to, the Operate trait requires that the operator summarize itself, providing a collection of path summaries for each of its internal paths, from each of its inputs to each of its outputs. The operator must describe what timestamps could possibly result at each of its output as a function of a timestamped message arriving at each of its inputs.

For most operators, this summary is simply: "timestamped data at each of my inputs could result in equivalently timestamped data at each of my outputs". This is a fairly simple summary, and while it isn't very helpful in making progress, it is the only guarantee the operator can provide.

The Feedback operator is where most of our interesting path summaries start: it is the operator found in feedback loops that ensures that a specific coordinate of the timestamp is incremented. All cycles in a timely dataflow graph must pass through such an operator, and in cyclic dataflow we see non-trivial summaries from the outputs of downstream operators to the inputs of upstream operators.

Another useful summary is the absence of a summary. Sometimes there is no path between two points in a dataflow graph. Perhaps you do some computation on your dataflow inputs, and then introduce them into an iterative computation; there is no path that returns that data to the computation you do on the inputs. Records in-flight in the iterative subcomputation will not block the computation you do on your inputs, because we know that there is no path back from the iteration to the inputs.

For example, an operator could plausibly have two inputs, a data input and a diagnostic input, and corresponding data and diagnostic outputs. The operator's internal summary could reveal that diagnostic input cannot result in data output, which allows us to issue diagnostic queries (as data for that input) without blocking downstream consumers of the data output. Timely dataflow can see that even though there are messages in flight, they cannot reach the data output and need not be on the critical path of data computation.

A Compiled Representation

From the operator summaries we build path summaries, and from the path summaries we determine, for every pair of either Source or Target a collection of path summaries between the two. How could a timestamped message at one location lead to timestamped messages at the other?

The only constraint we require is that there should be no cycles that do not strictly advance a timestamp.

A Safety Property

Each operator maintains a collection of counts of timestamp capabilities at each location (Source or Target) in the dataflow graph. At the same time, there is a statically defined set of path summaries from each location to any other location in the dataflow graph.

The safety property is: for any collection of counts resulting from the accumulation of arbitrary prefixes of progress update batches from participating workers, if for any location l1 in the dataflow graph and timestamp t1 there is not another location l2 and timestamp t2 with strictly positive accumulated count such that there is a path summary p from l2 to l1 with p(t2) <= t1, then no message will ever arrive at location l1 bearing timestamp t1.

This property has a formalization in TLA courtesy of Tom Rodeheffer, but let's try to develop the intuition for its correctness.

Actually, let's first develop some counter-intuition for its correctness. This property holds, but we've said almost nothing about the communication of messages between workers. The only property we use about message delivery in proving the safety property is that it is "at most once"; messages should not be multiplied in flight. Does it matter in which order messages are delivered? No. Does it matter that messages are ever delivered? No (this is only a safety property). Can operators consume, digest, and send messages that their local progress protocol doesn't even know exist yet? Yes.

There is almost no coordination between the data plane, on which messages get sent, and the control plane, along which progress update batches get sent. The only requirement is that you do not send a progress update batch for some action that you have not performed.

So where do we find intuition for the correctness of the protocol?

Although we may have only seen prefixes of the progress update batches from other workers, we can nonetheless reason about what future progress update batches from each worker will need to look like. In the main, we will use the property that if updates correspond to things that actually happen:

  1. Any message consumed must have a corresponding message produced, even if we haven't heard about it yet.
  2. Any message produced must involve a capability held, even if we haven't heard about it yet.
  3. Any capability held must involve either another capability held, or a message consumed.

We'll argue that for any sequence of progress updates some prefix of which accumulates as assumed in the statement of the safety property, then no extension of these prefixes could possibly include an update acknowledging a message received at l1 with timestamp t1.

First, we define an order on the pairs (li, ti) of locations and timestamps, in the Naiad paper called pointstamps. Pointstamps are partially ordered by the could-result-in relation: (li, ti) could-result-in (lj, tj) if there is a path summary p from li to lj where p(ti) <= tj. This is an order because it is (i) reflexive by definition, (ii) antisymmetric because two distinct pointstamps that could-result-in each other would imply a cycle that does not strictly advance the timestamp, and (iii) transitive by the assumed correctness of path summary construction.

Next, each progress update batch has a property, one that is easier to state if we imagine operators cannot clone capabilities, and must consume a capability to send a message: each atomic progress update batch decrements a pointstamp and optionally increments pointstamps strictly greater than it. Alternately, each progress update batch that increments a pointstamp must decrement a pointstamp strictly less than it. Inductively, any collection of progress update batches whose net effect increments a pointstamp must have a net effect decrementing some pointstamp strictly less than it.

Now, just before a worker might hypothetically receive a message at (l1, t1), we freeze the system. We stop performing new actions, and instead resolve all progress updates for actions actually performed. Let these progress update batches circulate, so that the worker in question now has a complete and current view of the frozen state of the system. All pointstamp counts should now be non-negative.

The additional progress updated batches received as part of stabilizing the system can only have a net positive effect on (l1, t1) if they have a net negative effect on some poinstamp strictly less than it. However, as the final accumulation is non-negative, this can only happen if the strictly prior pointstamp had a positive accumulation before stabilization. If no such pointstamp has a positive accumulation before stabilization, it is not possible for (l1, t1) to have a positive accumulation at stabilization, and consequently there cannot be a message waiting to be received.