Custom Datatypes
WORK IN PROGRESS
Timely dataflow allows you to use a variety of Rust types, but you may also find that you need (or would prefer) your own struct
and enum
types.
Timely dataflow provides two traits, Data
and ExchangeData
for types that timely dataflow can transport within a worker thread and across threads.
The Data
trait
The Data
trait is essentially a synonym for Clone+'static
, meaning the type must be cloneable and cannot contain any references with other than a static lifetime. Most types implement these traits automatically, but if yours do not you should decorate your struct definition with a derivation of the Clone
trait:
#[derive(Clone)]
struct YourStruct { .. }
The ExchangeData
trait
The ExchangeData
trait is more complicated, and is established in the communication/
module. The trait is a synonym for
Send+Sync+Any+serde::Serialize+for<'a>serde::Deserialize<'a>+'static
where serde
is Rust's most popular serialization and deserialization crate. A great many types implement these traits. If your types does not, you should add these decorators to their definition:
#![allow(unused)] fn main() { extern crate serde; use serde::{Serialize, Deserialize}; #[derive(Serialize, Deserialize)] struct Dummy {} }
You must include the serde
crate, and if not on Rust 2018 the serde_derive
crate.
The downside to is that deserialization will always involve a clone of the data, which has the potential to adversely impact performance. For example, if you have structures that contain lots of strings, timely dataflow will create allocations for each string even if you do not plan to use all of them.
An example
Let's imagine you would like to play around with a tree data structure as something you might send around in timely dataflow. I've written the following candidate example:
extern crate timely; use timely::dataflow::operators::*; fn main() { timely::example(|scope| { (0..10).to_stream(scope) .map(|x| TreeNode::new(x)) .inspect(|x| println!("seen: {:?}", x)); }); } // A tree structure. struct TreeNode<D> { data: D, children: Vec<TreeNode<D>>, } impl<D> TreeNode<D> { fn new(data: D) -> Self { Self { data, children: Vec::new() } } }
This doesn't work. You'll probably get two errors, that TreeNode
doesn't implement Clone
, nor does it implement Debug
. Timely data types need to implement Clone
, and our attempt to print out the trees requires an implementation of Debug
. We can create these implementations by decorating the struct
declaration like so:
#![allow(unused)] fn main() { #[derive(Clone, Debug)] struct TreeNode<D> { data: D, children: Vec<TreeNode<D>>, } }
This works for me! We see
Echidnatron% cargo run --example types
Compiling timely v0.8.0 (/Users/mcsherry/Projects/timely-dataflow)
Finished dev [unoptimized + debuginfo] target(s) in 5.33s
Running `target/debug/examples/types`
seen: TreeNode { data: 0, children: [] }
seen: TreeNode { data: 1, children: [] }
seen: TreeNode { data: 2, children: [] }
seen: TreeNode { data: 3, children: [] }
seen: TreeNode { data: 4, children: [] }
seen: TreeNode { data: 5, children: [] }
seen: TreeNode { data: 6, children: [] }
seen: TreeNode { data: 7, children: [] }
seen: TreeNode { data: 8, children: [] }
seen: TreeNode { data: 9, children: [] }
Echidnatron%
Exchanging data
Let's up the level a bit and try and shuffle our tree data between workers.
If we replace our main
method with this new one:
extern crate timely; use timely::dataflow::operators::*; fn main() { timely::example(|scope| { (0..10).to_stream(scope) .map(|x| TreeNode::new(x)) .exchange(|x| x.data) .inspect(|x| println!("seen: {:?}", x)); }); } #[derive(Clone, Debug)] struct TreeNode<D> { data: D, children: Vec<TreeNode<D>>, } impl<D> TreeNode<D> { fn new(data: D) -> Self { Self { data, children: Vec::new() } } }
We get a new error. A not especially helpful error. It says that it cannot find an exchange
method, or more specifically that one exists but it doesn't apply to our type at hand. This is because the data need to satisfy the ExchangeData
trait but do not. It would be better if this were clearer in the error messages, I agree.
The fix is to update the source like so:
#![allow(unused)] fn main() { extern crate serde; use serde::{Serialize, Deserialize}; #[derive(Clone, Debug, Serialize, Deserialize)] struct TreeNode<D> { data: D, children: Vec<TreeNode<D>>, } }
and make sure to include serde
crate with the derive
feature on.
Echidnatron% cargo run --example types
Finished dev [unoptimized + debuginfo] target(s) in 0.07s
Running `target/debug/examples/types`
seen: TreeNode { data: 0, children: [] }
seen: TreeNode { data: 1, children: [] }
seen: TreeNode { data: 2, children: [] }
seen: TreeNode { data: 3, children: [] }
seen: TreeNode { data: 4, children: [] }
seen: TreeNode { data: 5, children: [] }
seen: TreeNode { data: 6, children: [] }
seen: TreeNode { data: 7, children: [] }
seen: TreeNode { data: 8, children: [] }
seen: TreeNode { data: 9, children: [] }
Echidnatron%
Great news!