DeltaCV之共享内存篇

这篇文章我们来认识一下线程间通信的一种手段—共享内存,共享内存由于其特殊性—共享内存是存在与每个进程的地址空间中的,通俗点就是这部分数据是对每个进程可见的,这样每个进程都可以在一定条件下直接操作共享内存中的数据,避免了数据的复制等耗时的操作,这也是共享内存比其他几种IPC机制(信号量、管道、消息队列等)的通信方式效率高的原因,但是共享内存也有缺点,那就是需要独立实现消息的同步机制。 本文将完成一个基础的共享内存类模板,包含在DeltaCV项目中,欢迎star,fork。

概述

在boost/Interprocess中,提供了很多关于操作系统底层的进程间通信的抽象层,它使用C++将操作系统的底层接口进行封装,并消除了不同操作系统之间各种各样的接口带来的开发困难等问题。

共享内存

先介绍一种最普通的共享内存创建方式,boost::interprocess::shared_memory_object类。

1
2
3
4
5
#include <boost/interprocess/shared_memory_object.hpp> 
// boost::interprocess::open_or_create: 第一个参数说明了当前的操作是什么,直接可以根据字面意思猜测到,open_or_create指如果尝试打开名字为DeltaCV的共享内存失败时,就创建它,当然也有create_only,open_only操作。
// "DeltaCV": 表明当前我们操作的共享内存的名字为DeltaCV。
// boost::interprocess::read_write: 所有进程对此共享内存的操作权限(暂且可以这样认为,感觉不太严谨),read_write即可读写,同时还有read_only供选择。
boost::interprocess::shared_memory_object shm(boost::interprocess::open_or_create, "DeltaCV", boost::interprocess::read_write); 

共享内存创建后大小为0,我们使用truncate()方法来为共享内存设定大小,单位为字节。

1
2
// 为刚才创建的共享内存开辟1024个字节大小的空间。
shm.truncate(1024);

然而刚才我们创建的共享内存虽然对所有进程可见,但是现在还不能直接操作,因为每个进程都有自己独立的地址空间,我们还需要将上面的共享内存映射进当前进程的地址空间中,可以使用mapped_region()方法完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <boost/interprocess/mapped_region.hpp> 
//shm: 传入上文实例化的共享内存对象
//boost::interprocess::read_write: 当前进程对此共享内存的操作权限
/* 
	函数的原型如下
			mapped_region region(
				shm,         //Memory-mappable object  
   				mode,        //Access mode  
				start,       //Offset from the beginning of shm 
				end          //Length of the region  
   			);  
   	当你省略后面两个参数时,将默认映射整个共享内存对象
*/
boost::interprocess::mapped_region region(shm, boost::interprocess::read_write);
// 共享内存的其实地址
std::cout<<region.get_address()<<std::endl;
// 共享内存块的大小
std::cout<<region.get_size()<<std::endl;

接下来就是开始向共享内存中存数据了,跟我们普通的指针操作一样.

1
2
3
4
5
6
// 先将地址的指针强制转换成int指针,因为我们要储存的是int变量
int *n = static_cast<int*>(region.get_address()); 
// 和普通的指针操作一样,这里讲1存放进共享内存中
*n = 1;

// 如果在其它程序中我们已经存放过数据了,而想要取数据,则*n就是我们要取的数据了

那如何删除我们创建的共享内存呢?多数linux系统中,如果不主动删除共享内存,它会一直存在到系统重启.

1
2
//返回值指示了是否删除成功
bool removed = boost::interprocess::shared_memory_object::remove("DeltaCV"); 

托管共享内存

然而上述的方式几乎不会用到,因为它每次需要按单个字符的形式读写内存,所以绝大多数情况下我们会使用托管共享内存方式,它会以内存申请的方式对共享内存对象进行初始化.

1
2
3
4
5
6
7
8
9
#include <boost/interprocess/managed_shared_memory.hpp> 
// boost::interprocess::open_or_create: 含义同上
// "Highscore": 共享内存的名称
// 1024: 共享内存的大小
boost::interprocess::managed_shared_memory managed_shm(boost::interprocess::open_or_create, "DeltaCV", 1024);
// 创建了一个包含10个元素的int型数组,起名为"Integer",且值初始化为1,此时n为数组首元素的地址
int *n = managed_shm.construct<int>("Integer")[10](1); //当"Integer"之前不存在

std::pair<int*,size_t > n = managed_shm.find<int>("Integer")[10](1); //当"Integer"已经存在时,我们使用find来找到这个数据,可见此时返回的是pair类型数据,first保存的是数据首元素地址,second保存的是整个数据的长度.

删除托管共享内存

1
boost::interprocess::shared_memory_object::remove("DeltaCV");

互斥对象

既然共享内存对所有进程都可见,那么怎么保证共享内存的读写不会相互干扰呢?那就是使用互斥对象,当共享内存被使用时,互斥对象被占用,其他进程想要操作同一共享内存时,就需要等待互斥对象释放,这样就保证了共享内存的正确读写,类似于多线程中的锁.

互斥对象的声明

1
2
3
4
5
6
7
8
9
10
#include <boost/interprocess/sync/interprocess_mutex.hpp> 
// 声明了一个名为"mtx"互斥对象,使用的是find_or_construct,含义同上
boost::interprocess::interprocess_mutex *mtx = managed_shm.find_or_construct<boost::interprocess::interprocess_mutex>("mtx")(); 
// 上锁,也就是占用互斥对象,一般每个共享变量对应一个互斥对象
mtx->lock(); 
/* 
	TODO SOMETHING
*/
//解锁
mtx->unlock(); 

DeltaCV中的共享内存类模板

至此,关于boost下的共享内存编程就简单的介绍一下,应该能满足日常的使用了,下面我将介绍我在DeltaCV中封装的共享内存类模板,整体框架类似与ROS中的topic形式,提供发布者和订阅者.完整代码见DeltaCV.

发布器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
template <typename T1>
class shm_publisher{
public:
    shm_publisher(const char* node_name, const int RECEIVE_DATA_LENGTH)
                    :shm_name(string(node_name)), //创建的
                     per_data_bytes(sizeof(T1)),
                     data_length(RECEIVE_DATA_LENGTH+1)
    {
        lock_name = string(node_name)+"_lock"; //定义互斥变量的名称
        data_name = string(node_name)+"_data"; //定义数据的名称
		update_name = string(node_name)+"_update"; //更新标志位
		
		boost::interprocess::shared_memory_object::remove(shm_name.c_str()); 	//首先检查内存是否被释放
        boost::interprocess::named_mutex::remove(lock_name.c_str()); //检查互斥变量是否被释放
        //托管共享内存
        managed_shm = new boost::interprocess::managed_shared_memory(
                        boost::interprocess::create_only,
                        shm_name.c_str(),
                        per_data_bytes*data_length + 4 + 1024);
        // 互斥变量
        named_mtx = new boost::interprocess::named_mutex(
                        boost::interprocess::create_only,
                        lock_name.c_str());
        // 变量
        user_data = managed_shm->construct<T1>(data_name.c_str())[data_length](0);
        update_flag = managed_shm->construct<int>(update_name.c_str())[1](0);
    }
    ~shm_publisher() 
    {
        boost::interprocess::shared_memory_object::remove(shm_name.c_str());
        boost::interprocess::named_mutex::remove(lock_name.c_str());
        managed_shm->destroy<T1>(data_name.c_str());
        managed_shm->destroy<int>(update_name.c_str());
    }
    
    // 发布函数如下:
    void broadcast(T1* data)
    {
        named_mtx->lock();
        memcpy(user_data, data, per_data_bytes*data_length);
        *update_flag = 1;
        named_mtx->unlock();
    }
    
private:
    T1* user_data;
    int* update_flag;
    std::string shm_name;
    std::string lock_name;
    std::string data_name;
    std::string update_name;
    
    boost::interprocess::managed_shared_memory* managed_shm;
    boost::interprocess::named_mutex* named_mtx;
    
    int per_data_bytes,data_length;
};

订阅器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
template <typename T2>
class shm_subscriber{
public:
	shm_subscriber(const char* node_name)
                    :shm_name(string(node_name)),
                    per_data_bytes(sizeof(T2))
{
	lock_name = string(node_name) + "_lock"; //这里的命名规则与发布器对应
	data_name = string(node_name) + "_data";
	update_name = string(node_name)+"_update";

	//托管共享内存
	managed_shm = new boost::interprocess::managed_shared_memory(
                        boost::interprocess::open_only,
                        shm_name.c_str());
	//获得互斥对象
	named_mtx = new boost::interprocess::named_mutex(
                        boost::interprocess::open_only,
                        lock_name.c_str());
	//找到两个变量bianlaing
	user_data = managed_shm->find<T2>(data_name.c_str());
	update_flag = managed_shm->find<int>(update_name.c_str());
}

// 返回值表示当前取得值是否更新
bool get(T2* data)
{
	named_mtx->lock();
	if(update_flag.first[0]==1) //检查标志位是否置1
	{
		update_flag.first[0]=0;
		memcpy(data, user_data.first,user_data.second*per_data_bytes);
		named_mtx->unlock();
		return true;
	} else {
		named_mtx->unlock();
		return false;
	}
}

private:
    std::pair<T2*,size_t > user_data;
    std::pair<int*,size_t > update_flag;

    std::string shm_name;
    std::string lock_name;
    std::string data_name;
    std::string update_name;
    uint8_t per_data_bytes;

    boost::interprocess::managed_shared_memory* managed_shm;
    boost::interprocess::named_mutex* named_mtx;
};

小技巧

代码中,我额外添加了一个标志位update_flag,用来指示当前取到的数据是否是最新值,因为只要发布器那边存入一次数据,update_flag这个变量就置为1,而订阅器每取一次数据,就讲update_flag置0,所以当订阅器的获取频率超过发布器时,订阅器取到的数据一部分是未更新的数据,则订阅器能通过get()的返回值来选择是否使用本次获取的数据(若返回值为true,则为最新值,若为false,则说明自上次订阅器取过值后到这次取值之间,发布器没有存入过新的数据).

演示

分别新建两个main函数,代表两个进程,其中一个作为发布器,每5s发布一次数据,另外一个作为订阅器,每1s获取一次数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// main1.cpp
#include <iostream>
#include "shm.hpp"
int a[2] = {4,5};
int main() {
    deltaCV::shm_publisher<int> pub("test",2);
    while(1)
    {
        p.broadcast(a);
        sleep(5);
    }
    return 0;
}

// main2.cpp
#include <iostream>
#include "shm.hpp"
int a[2];
int main() {
    deltaCV::shm_subscriber<int> sub("test");
    while(1)
    {
        bool res = sub.get(a);
        cout<<"update: "<<res<<" ,a[0]: "<<a[0]<<" ,a[1]: "<<a[1]<<endl;
        sleep(1);
    }
    return 0;
}

最终输出为:(可以看到5s内,订阅器有4次获取的是未更新的值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
update: 1 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 1 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 1 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5

参考:http://zh.highscore.de/cpp/boost/interprocesscommunication.html