orocos lock free data object

lock free 的 circular bufer, 一个 producer + 多个 cosumer

namespace RTT
{ namespace base {

    /**
     * @brief This DataObject is a Lock-Free implementation,
     * such that reads and writes can happen concurrently without priority
     * inversions.
     *
     * When there are more writes than reads, the last write will
     * be returned. The internal buffer can get full if too many
     * concurrent reads are taking to long. In that case, each new
     * read will read the element the previous read returned.
     *
     * @verbatim
     * The following Truth table applies when a Low Priority thread is
     * preempted by a High Priority thread :
     *
     *   L\H | Set | Get |
     *   Set | Ok  | Ok  |
     *   Get | Ok  | Ok  |
     *
     * legend : L : Low Priority thread
     *          H : High Priority thread
     *          Blk: Blocks High Priority thread (bad!)
     *          internal::NA : Not allowed !
     * @endverbatim
     * Further, multiple reads may occur before, during and after
     * a write operation simultaneously. The buffer needs readers+2*writers
     * elements to be guaranteed non blocking.
     * @ingroup PortBuffers
     */
    template<class T>
    class DataObjectLockFree
        : public DataObjectInterface<T>
    {
    public:
        /**
         * The type of the data.
         */
        typedef T DataType;

        /**
         * @brief The maximum number of threads.
         *
         * When used in data flow, this is always 2.
         */
        const unsigned int MAX_THREADS; // = 2
    private:
        /**
         * Conversion of number of threads to size of buffer.
         */
        const unsigned int BUF_LEN; // = MAX_THREADS+2

        /**
         * Internal buffer structure.
         * Both the read and write pointers pointing to this struct
         * must be declared volatile, since they are modified in other threads.
         * I did not declare data as volatile,
         * since we only read/write it in secured buffers.
         */
        struct DataBuf {
            DataBuf()
                : data(), counter(), next()
            {
                oro_atomic_set(&counter, 0);
            }
            DataType data; mutable oro_atomic_t counter; DataBuf* next;
        };

        typedef DataBuf* volatile VolPtrType;
        typedef DataBuf  ValueType;
        typedef DataBuf* PtrType;

        VolPtrType read_ptr;
        VolPtrType write_ptr;

        /**
         * A 3 element Data buffer
         */
        DataBuf* data;
    public:

        /**
         * Construct a DataObjectLockFree by name.
         *
         * @param _name The name of this DataObject.
         * @param initial_value The initial value of this DataObject.
         */
        DataObjectLockFree( const T& initial_value = T(), unsigned int max_threads = 2 )
            : MAX_THREADS(max_threads), BUF_LEN( max_threads + 2),
              read_ptr(0),
              write_ptr(0)
        {
            data = new DataBuf[BUF_LEN];
            read_ptr = &data[0];
            write_ptr = &data[1];
            data_sample(initial_value);
        }

        ~DataObjectLockFree() {
            delete[] data;
        }

        /**
         * Get a copy of the data.
         * This method will allocate memory twice if data is not a value type.
         * Use Get(DataType&) for the non-allocating version.
         *
         * @return A copy of the data.
         */
        virtual DataType Get() const {DataType cache; Get(cache); return cache; }

        /**
         * Get a copy of the Data (non allocating).
         * If pull has reserved enough memory to store the copy,
         * no memory will be allocated.
         *
         * @param pull A copy of the data.
         */
        virtual void Get( DataType& pull ) const
        {
            PtrType reading;
            // loop to combine Read/Modify of counter
            // This avoids a race condition where read_ptr
            // could become write_ptr ( then we would read corrupted data).
            do {
                reading = read_ptr;            // copy buffer location
                oro_atomic_inc(&reading->counter); // lock buffer, no more writes 不能在读的过程中进行写操作
                // XXX smp_mb
                if ( reading != read_ptr )     // if read_ptr changed,
                    oro_atomic_dec(&reading->counter); // better to start over.
                else
                    break;
            } while ( true );
            // from here on we are sure that ‘reading‘
            // is a valid buffer to read from.
            pull = reading->data;               // takes some time
            // XXX smp_mb
            oro_atomic_dec(&reading->counter);       // release buffer
        }

        /**
         * Set the data to a certain value (non blocking).
         *
         * @param push The data which must be set.
         */
        virtual void Set( const DataType& push )
        {
            /**
             * This method can not be called concurrently (only one
             * producer). With a minimum of 3 buffers, if the
             * write_ptr+1 field is not occupied, it will remain so
             * because the read_ptr is at write_ptr-1 (and can
             * not increment the counter on write_ptr+1). Hence, no
             * locking is needed.
             */
            // writeout in any case
            write_ptr->data = push;
            PtrType wrote_ptr = write_ptr;
            // if next field is occupied (by read_ptr or counter),
            // go to next and check again...
            // 环形buffer,如果是前一个还没有读完,这时候新的写进去又加进来新的读取就会有这种事情发生;
            // 如果前一个读取没有完成,后续最多能再写两次读两次(buffer大小为4).
            while ( oro_atomic_read( &write_ptr
            ->next->counter ) != 0 || write_ptr->next == read_ptr )
                {
                    write_ptr = write_ptr->next;
                    if (write_ptr == wrote_ptr)
                        return; // nothing found, to many readers !
                }

            // we will be able to move, so replace read_ptr
            read_ptr  = wrote_ptr;
            write_ptr = write_ptr->next; // we checked this in the while loop
        }

        virtual void data_sample( const DataType& sample ) {
            // prepare the buffer.
            for (unsigned int i = 0; i < BUF_LEN-1; ++i) {
                data[i].data = sample;
                data[i].next = &data[i+1];
            }
            data[BUF_LEN-1].data = sample;
            data[BUF_LEN-1].next = &data[0];
        }
    };
}}
时间: 2024-08-29 06:34:56

orocos lock free data object的相关文章

Operating System: Three Easy Pieces --- Lock Concurrent Data Structures (Note)

Before moving beyong locks, we will first describe how to use locks in some common data structures. Adding locks to a data structure to make it usable by threads makes the structure thread safe. Of course, exactly how such locks are added determines

PDO(PHP Data Object),Mysqli,以及对sql注入等问题的解决

这篇是上一篇 http://www.cnblogs.com/charlesblc/p/5987951.html 的续集. 看有的文章提到mysqli和PDO都支持多重查询,所以下面的url会造成表数据被删. http://localhost:8080/test.php?id=3;delete%20from%20users 可是我在mysql版本的函数,上面的sql都不能执行.是不是不支持多重查询了? 这篇文章 http://www.runoob.com/php/php-mysql-connect

php之PDO (PHP DATA OBJECT)

从 PHP 5.1 开始附带了 PDO,PHP 数据对象 (PDO) 扩展为PHP访问数据库定义了一个轻量级的一致接口.PDO 提供了一个 数据访问 抽象层,这意味着,不管使用哪种数据库(比如mysql,oracle,mssql-),都可以用相同的函数(方法)来查询和获取数据. 1.创建PDO对象 使用PDO扩展必须在php.ini文件中打开相应的扩展,下图打开了pdo_mysql的扩展: 那怎么创建一个pdo对象呢? <?php //$dsn = "mysql:host=服务器地址/名称

PHP数据库抽象层--PDO(PHP Data Object) [一]

1.简介:(PDO数据库访问抽象层,统一各种 数据库的访问接口 ) PHP 数据对象 (PDO) 扩展为PHP访问数据库定义了一个轻量级的一致接口.实现 PDO 接口的每个数据库驱动可以公开具体数据库的特性作为标准扩展功能. 注意利用 PDO 扩展自身并不能实现任何数据库功能:必须使用一个 具体数据库的 PDO 驱动 来访问数据库服务. PDO 提供了一个 数据访问 抽象层,这意味着,不管使用哪种数据库,都可以用相同的函数(方法)来查询和获取数据. PDO 不不提供 数据库 抽象层:它不会重写

ORA-04021:timeout occurred while waiting to lock object

编译某存储过程 ORA-04021 timeout occurred while waiting to lock object stringstringstringstringstring Cause: While waiting to lock a library object, a timeout is occurred. Action: Retry the operation later. 查找是否有用户类型的锁,即 TM                DML排队 TX          

MFC data forwarding to main thread via PostMessage

MFC data forwarding to main thread via PostMessage          up vote3down votefavorite 3 I have a C++/MFC application I need to restructure. The app used to process most of the data on the main thread, therefore blocking the input, and now I want to c

【Java并发系列04】线程锁synchronized和Lock和volatile和Condition

img { border: solid 1px } 一.前言 多线程怎么防止竞争资源,即防止对同一资源进行并发操作,那就是使用加锁机制.这是Java并发编程中必须要理解的一个知识点.其实使用起来还是比较简单,但是一定要理解. 有几个概念一定要牢记: 加锁必须要有锁 执行完后必须要释放锁 同一时间.同一个锁,只能有一个线程执行 二.synchronized synchronized的特点是自动释放锁,作用在方法时自动获取锁,任意对象都可做为锁,它是最常用的加锁机制,锁定几行代码,如下: //---

同步块中对象级别的锁和类级别的锁 —— Thread synchronization, object level locking and class level locking

Java supports multiple threads to be executed. This may cause two or more threads to access the same fields or objects. Synchronization is a process which keeps all concurrent threads in execution to be in synch. Synchronization avoids memory consist

Library cache lock/pin详解

Library cache lock/pin 一.概述 ---本文是网络资料加metalink 等整理得来一个实例中的library cache包括了不同类型对象的描述,如:游标,索引,表,视图,过程,等等. 这些对象不能在他们被使用的时候改变,他们在被使用的时候会被一种library locks and pins的机制锁住. 一个会话中,需要使用一个对象,会在该对象上先得到一个library lock(null, shared or exclusive模式的)这是为了,防止其他会话也访问这个对