A feature-detection example using the Intel® Threading Building Blocks flow graph
By Michael V. (Intel), Added September 9, 2011
Translate
Chinese Simplified
Chinese Traditional
English
French
German
Italian
Portuguese
Russian
Spanish
Turkish
Translate
The Intel® Threading Building Blocks ( Intel® TBB ) flow graph is fully supported in Intel® TBB 4.0. If you are unfamiliar with the flow graph, you can read an introduction here.
Figure 1 below shows a flow graph that implements a simple feature detection application. A number of images will enter the graph and two alternative feature detection algorithms will be applied to each one. If either algorithm detects a feature of interest, the image will be stored for later inspection. In this article, I’ll describe each node used in this graph, and then provide and described a complete working implementation.
Figure 1: The Intel® TBB flow graph for the feature-detection example.
In the figure, there are four different type of nodes used to construct the application: a source_node
, a queue_node
, two join_node
s, and several function_node
s. Before I provide a sample implementation, I’ll provide a brief overview of each node.
The first type of node is a source_node
, which is shown pictorially using the symbol below. This type of node has no predecessors, and is used to generate messages that are injected into the graph. It executes a user functor (or lambda expression) to generate its output. The unfilled circle on its right side indicates that it buffers its output and that this buffer can be reserved. The source_node
buffers a single item. When a buffer is reserved, a value is held for the caller until the caller either consumes or releases the value. A source_node
will only invoke the user functor when there is nothing currently buffered in its single item output buffer.
The second type of node is a queue_node
, which is show using the figure below. A queue_node is an unbounded first-in first-out buffer. Like the source_node
, its output is reservable.
The third type of node, of which there are two variants used in the example, is the join_node
. A join_node
has multiple input ports and generates a single output tuple that contains a value received at each port. A join_node
can use different policies at its input ports: queueing
, reserving
or tag_matching
. A queueing join_node
, greedily consumes all messages as they arrive and generates an output whenever it has at least 1 item at each input queue. A reserving join_node
only attempts to generate a tuple when it can successfully reserve an item at each input port. If it cannot successfully reserve all inputs, it releases all of its reservations and will only try again when it receives a message from the port or ports it was previously unable to reserve. Lastly, a tag_matching join_node
uses hash tables to buffer messages in its input ports. When it has received messages at each port that have matching keys, it creates an output tuple with these messages. Shown below are the symbol for the reserving
and tag_matching join_node
s used in Figure 1.
The final node type used in this example is a function_node
; it uses the symbol shown below. A function_node
executes a user-provided functor or lambda expression on incoming messages, passing the return value to its successors. A function_node
can be constructed with a limited or unlimited allowable concurrency level. A function_node
with unlimited concurrency creates a task to apply its functor to each message as they arrive. If a function_node
has limited concurrency, it will create tasks only up to its allowed concurrency level, buffering messages at its input as necessary so that they are not dropped.
To save on space, I’m going to fake the image processing parts of this example. In particular, each image will simply be an array of characters. An image that contains the character ‘A’ has a feature recognizable by algorithm A, and an image that contains the character ‘B’ has a feature recognizable by algorithm B. So in the post, I will provide the complete code to construct and execute a flow graph that has the structure shown in Figure 1, but I’ll replace the actual computations with trivial ones.
Below is the declaration of struct image
, as well as the trivial implementations that can be used as the bodies of the function nodes. The function get_next_image
will be used by the source_node
to generate images for processing. You might note that in get_next_image
, every 11th image will have a feature detectable by algorithm A and every 13th image will contain a feature detectable by algorithm B. The function preprocess_image
adds a simple offset to each character, and detect_with_A
and detect_with_B
do the trivial search for the characters ‘A‘ and ‘B‘, respectively.
#include <cstring>
#include <cstdio>
const int num_image_buffers = 100;
int image_size = 10000000;
struct image {
const int N;
char *data;
image();
image( int image_number, bool a, bool b );
};
image::image() : N(image_size) {
data = new char[N];
}
image::image( int image_number, bool a, bool b ) : N(image_size) {
data = new char[N];
memset( data, ‘\0‘, N );
data[0] = (char)image_number - 32;
if ( a ) data[N-2] = ‘A‘;
if ( b ) data[N-1] = ‘B‘;
}
int img_number = 0;
int num_images = 64;
const int a_frequency = 11;
const int b_frequency = 13;
image *get_next_image() {
bool a = false, b = false;
if ( img_number < num_images ) {
if ( img_number%a_frequency == 0 ) a = true;
if ( img_number%b_frequency == 0 ) b = true;
return new image( img_number++, a, b );
} else {
return false;
}
}
void preprocess_image( image *input_image, image *output_image ) {
for ( int i = 0; i < input_image->N; ++i ) {
output_image->data[i] = input_image->data[i] + 32;
}
}
bool detect_with_A( image *input_image ) {
for ( int i = 0; i < input_image->N; ++i ) {
if ( input_image->data[i] == ‘a‘ )
return true;
}
return false;
}
bool detect_with_B( image *input_image ) {
for ( int i = 0; i < input_image->N; ++i ) {
if ( input_image->data[i] == ‘b‘ )
return true;
}
return false;
}
void output_image( image *input_image, bool found_a, bool found_b ) {
bool a = false, b = false;
int a_i = -1, b_i = -1;
for ( int i = 0; i < input_image->N; ++i ) {
if ( input_image->data[i] == ‘a‘ ) { a = true; a_i = i; }
if ( input_image->data[i] == ‘b‘ ) { b = true; b_i = i; }
}
printf("Detected feature (a,b)=(%d,%d)=(%d,%d) at (%d,%d) for image %p:%d\n",
a, b, found_a, found_b, a_i, b_i, input_image, input_image->data[0]);
}
The code to implement the flow graph itself is shown in function main
below. I will interject text in the middle of the listing of main
to describe the use of the flow graph components. If you want to build this example, you can just cut and paste the code snippets above and below linearly into a single file.
int num_graph_buffers = 8;
#include "tbb/flow_graph.h"
using namespace tbb;
using namespace tbb::flow;
int main() {
First, a graph g
is created. All of the nodes will belong to this single graph. A few typedefs are provided to make it easier to refer to the outputs of the join nodes:
graph g;
typedef std::tuple< image *, image * > resource_tuple;
typedef std::pair< image *, bool > detection_pair;
typedef std::tuple< detection_pair, detection_pair > detection_tuple;
Next, the queue_node
that holds the images buffers is created, along with the two join nodes. Again, note that the resource_join
is using the reserving
policy, while detection_join
uses the tag_matching
policy. To use tag_matching
, the user must provide functors that can extract the tag from the item; these appear as the additional arguments to the constructor.
queue_node< image * > buffers( g );
join_node< resource_tuple, reserving > resource_join( g );
join_node< detection_tuple, tag_matching > detection_join( g,
[](const detection_pair &p) -> size_t { return (size_t)p.first; },
[](const detection_pair &p) -> size_t { return (size_t)p.first; } );
Next, the nodes that execute the user’s code are created, including the source_node
and the four function_nodes
. The user’s code is passed to each node using a C++ lambda expression ( a function object could also be used ). For the most part, each lambda expression is a bit of wrapper code that calls the functions that were described earlier, obtaining inputs and creating outputs as necessary. The make_edge
calls wire together the nodes as shown in Figure 1.
source_node< image * > src( g,
[]( image* &next_image ) -> bool {
next_image = get_next_image();
if ( next_image ) return true;
else return false;
}
);
make_edge(src, input_port<0>(resource_join) );
make_edge(buffers, input_port<1>(resource_join) );
function_node< resource_tuple, image * >
preprocess_function( g, unlimited,
[]( const resource_tuple &in ) -> image * {
image *input_image = std::get<0>(in);
image *output_image = std::get<1>(in);
preprocess_image( input_image, output_image );
delete input_image;
return output_image;
}
);
make_edge(resource_join, preprocess_function );
function_node< image *, detection_pair >
detect_A( g, unlimited,
[]( image *input_image ) -> detection_pair {
bool r = detect_with_A( input_image );
return std::make_pair( input_image, r );
}
);
function_node< image *, detection_pair >
detect_B( g, unlimited,
[]( image *input_image ) -> detection_pair {
bool r = detect_with_B( input_image );
return std::make_pair( input_image, r );
}
);
make_edge(preprocess_function, detect_A );
make_edge(detect_A, input_port<0>(detection_join) );
make_edge(preprocess_function, detect_B );
make_edge(detect_B, input_port<1>(detection_join) );
function_node< detection_tuple, image * >
decide( g, serial,
[]( const detection_tuple &t ) -> image * {
const detection_pair &a = std::get<0>(t);
const detection_pair &b = std::get<1>(t);
image *img = a.first;
if ( a.second || b.second ) {
output_image( img, a.second, b.second );
}
return img;
}
);
make_edge(detection_join, decide);
make_edge(decide, buffers);
Because of the reserving join node at the front of the graph, the graph will remain idle until there are image buffers available in the buffers
queue. The for-loop below allocates and puts buffers into the queue. After the loop, the call to g.wait_for_all()
will block until the graph again becomes idle when all images are processed.
// Put image buffers into the buffer queue
for ( int i = 0; i < num_graph_buffers; ++i ) {
image *img = new image;
buffers.try_put( img );
}
g.wait_for_all();
When the graph is idle, all of the buffers will again be in the buffers queue. The queue_node
therefore needs to be drained and the buffers deallocated.:
for ( int i = 0; i < num_graph_buffers; ++i ) {
image *img = NULL;
if ( !buffers.try_get(img) )
printf("ERROR: lost a buffer\n");
else
delete img;
}
return 0;
}
I hope that this feature-detection example demonstrates how a reasonably complex flow graph that passes messages between nodes can be implemented. To learn more about the new features in Intel® Threading Building Blocks 4.0, visit http://www.threadingbuildingblocks.org or to learn more about the Intel® TBB flow graph, check-out the other blog articles at /en-us/blogs/tag/flow_graph/.
For more complete information about compiler optimizations, see our Optimization Notice.
Categories:
Tags: