基于steem的群签名方案实现3:节点间互相发送消息
这部分主要涉及到steem的p2p_plugin这个插件和net部分,在原先消息的基础上,增加了两种消息:vss_message和vk_message,这两种消息的发送方式有些不同,vss_meassage是使用vss分享秘钥时,针对不同节点的秘钥分享,因此对其他节点发送其对应的应该获得的消息,这个消息应该通过可信信道发送,使得其他人无法得知消息的内容,但是,在本系统中并未对消息进行加密。vk_message是节点需要对其他节点进行广播的消息。
p2p_plugin的工作过程
p2p_plugin中的
connect_to_p2p_network
,启动多个线程,不断进行节点,发送消息,同步交易,区块的工作:void node_impl::connect_to_p2p_network() { VERIFY_CORRECT_THREAD(); assert(_node_public_key != fc::ecc::public_key_data()); assert(!_accept_loop_complete.valid() && !_p2p_network_connect_loop_done.valid() && !_fetch_sync_items_loop_done.valid() && !_fetch_item_loop_done.valid() && !_advertise_inventory_loop_done.valid() && !_terminate_inactive_connections_loop_done.valid() && !_fetch_updated_peer_lists_loop_done.valid() && !_bandwidth_monitor_loop_done.valid() && !_dump_node_status_task_done.valid() && !_broadcast_vk_message_loop_done.valid()); if (_node_configuration.accept_incoming_connections) _accept_loop_complete = async_task( [=](){ accept_loop(); }, "accept_loop");//将已知的节点加入handshaking_connections,send_hello_message _p2p_network_connect_loop_done = async_task( [=]() { p2p_network_connect_loop(); }, "p2p_network_connect_loop" );//不断监听链接请求,将新节点加入handshaking_connections,connect_to_endpoint,connect_to_task(节点间相互发送消息,获取节点状态),read_loop,on_message,根据收到的消息进行处理,建立连接等 _fetch_sync_items_loop_done = async_task( [=]() { fetch_sync_items_loop(); }, "fetch_sync_items_loop" ); _fetch_item_loop_done = async_task( [=]() { fetch_items_loop(); }, "fetch_items_loop" ); _advertise_inventory_loop_done = async_task( [=]() { advertise_inventory_loop(); }, "advertise_inventory_loop" ); _terminate_inactive_connections_loop_done = async_task( [=]() { terminate_inactive_connections_loop(); }, "terminate_inactive_connections_loop" ); _fetch_updated_peer_lists_loop_done = async_task([=](){ fetch_updated_peer_lists_loop(); }, "fetch_updated_peer_lists_loop");//address_request_message,更新自己的地址列表,将别的节点的已知节点地址加入自己的潜在列表,在p2p_network_connect_loop中进行链接 _bandwidth_monitor_loop_done = async_task([=](){ bandwidth_monitor_loop(); }, "bandwidth_monitor_loop"); _dump_node_status_task_done = async_task([=](){ dump_node_status_task(); }, "dump_node_status_task"); _broadcast_vk_message_loop_done = async_task([=] () {broadcast_vk_message_loop();}, "broadcast vk message");// 发送vk,函数定下在下文 }
这段代码中最重要的函数是
p2p_network_connect_loop
这个函数将自己已知的加点加入handshaking_connections的队列,调用connect_to_endpoint
与节点建立连接,-->connect_to_task
-->read_loop
不断监听该节点发送的消息-->on_message
处理消息(收到),其他节点广播的交易,区块也是通过on_message
处理。增加节点的编号属性
由于发送vss消息时,需要对应节点的编号发送相应的vss_message,所以一个节点在与另一个节点建立链接的时必须告诉节点,它的编号是多少,因此,我把节点的编号的通知放在hello_message中
struct hello_message { static const core_message_type_enum type; std::string user_agent; uint32_t core_protocol_version; fc::ip::address inbound_address; uint16_t inbound_port; uint16_t outbound_port; node_id_t node_public_key; fc::ecc::compact_signature signed_shared_secret; fc::variant_object user_data; //--------group-signature------------------- uint16_t group_number; //节点的编号,从1开始 //-------------------------------------------- hello_message() {} hello_message(const std::string& user_agent, uint32_t core_protocol_version, const fc::ip::address& inbound_address, uint16_t inbound_port, uint16_t outbound_port, const node_id_t& node_public_key, const fc::ecc::compact_signature& signed_shared_secret, const fc::variant_object& user_data, uint16_t group_number ) : user_agent(user_agent), core_protocol_version(core_protocol_version), inbound_address(inbound_address), inbound_port(inbound_port), outbound_port(outbound_port), node_public_key(node_public_key), signed_shared_secret(signed_shared_secret), user_data(user_data), group_number(group_number) {} };
在hello_message的处理函数,
on_hello_message
中,增加记录节点编号的代码originating_peer->group_number = hello_message_received.group_number;
vss_message, vk_message 增加,发送,接收
定义vss_message,vk_message 在
libraries/net/include/graphene/net/core_messages.hpp
中增加vss_message和vk_message的类型定义。struct vss_message { static const core_message_type_enum type; std::string si; std::string ti; std::vector<std::string> Ei; uint16_t number; vss_message(){} vss_message(std::string s, std::string t, std::vector<std::string>Ei,uint16_t number): si(s),ti(t),Ei(Ei),number(number) {} }; struct vk_message { static const core_message_type_enum type; std::string vk; uint16_t number; vk_message(){} vk_message(std::string vk, uint16_t number): vk(vk), number(number) {} }; FC_REFLECT(graphene::net::vss_message,(si)(ti)(Ei)(number)) FC_REFLECT(graphene::net::vk_message,(vk)(number))
在
libraries/net/core_messages.cpp
中,增加类型定义:const core_message_type_enum vss_message::type = core_message_type_enum::vss_message_type; const core_message_type_enum vk_message::type = core_message_type_enum::vk_message_type;
定义send_vss_message
在
libraries/net/node.cpp
中定义,向某个节点发送vss_message的函数:void node_impl::send_vss_message(const peer_connection_ptr& peer) { std::list<peer_connection_ptr> original_active_peers(_active_connections.begin(),_active_connections.end()); for( const peer_connection_ptr& active_peer: original_active_peers ) { try { uint16_t n = active_peer->group_number; steem::plugins::group_signature::group_signature_plugin& pp = appbase::app().get_plugin<steem::plugins::group_signature::group_signature_plugin>(); vss_message vss( pp.my->si_string[n-1], pp.my->ti_string[n-1], pp.my->Ei_string,_group_number); active_peer->send_message(vss); ilog("send vss message to ${peer}\n\n",("peer",active_peer->get_remote_endpoint())); } catch(const std::exception& e) { std::cerr << e.what() << '\n'; } } }
设置send_vss_message函数的调用时机,我选在在每当有一个节点加入active_connections时,向该节点发送vss_message
void node_impl::move_peer_to_active_list(const peer_connection_ptr& peer) { VERIFY_CORRECT_THREAD();//同步与互斥管理 _active_connections.insert(peer); _handshaking_connections.erase(peer); _closing_connections.erase(peer); _terminating_connections.erase(peer); fc_ilog(fc::logger::get("sync"), "New peer is connected (${peer}), now ${count} active peers", ("peer", peer->get_remote_endpoint()) ("count", _active_connections.size())); ilog("New peer is connected (${peer}), now ${count} active peers", ("peer", peer->get_remote_endpoint()) ("count", _active_connections.size())); std::cout<<"this is active list\n\n"; send_vss_message(peer); //不断检查是否形成了vk ilog("send a vss message to ${peer}",("peer",peer->get_remote_endpoint())); }
在node.cpp中,定义vss_message的处理函数,on_vss_message,vk_message的处理函数,on_vk_message
void node_impl::on_vss_message(peer_connection* originating_peer, const vss_message& vss_message_received) { VERIFY_CORRECT_THREAD(); ilog("received a vss message from ${peer}",("peer",originating_peer->get_remote_endpoint())); uint16_t n = vss_message_received.number -1; for(int i = 0; i< (int)vss_message_received.Ei.size(); i++){ std::cout<<vss_message_received.Ei[i]<<"\n"; } steem::plugins::group_signature::group_signature_plugin& pp = appbase::app().get_plugin<steem::plugins::group_signature::group_signature_plugin>(); if(pp.my->si_string_received[n] == "" || pp.my->si_string_received[n] != vss_message_received.si){ if(pp.my->VerifyShare(vss_message_received.si, vss_message_received.ti, vss_message_received.Ei))//验证 { pp.my->si_string_received[n] = vss_message_received.si;//验证通过后,记录到group_signature_plugin pp.my->ti_string_received[n] = vss_message_received.ti; pp.my->mpkGen(); trigger_broadcast_vk_message_loop(); } else{ std::cout<<"vss message is invalid\n\n"; } } else{ std::cout<<"vss mesage is already received\n\n"; } return; } void node_impl::on_vk_message(peer_connection* originating_peer, const vk_message& vk_message_received) { VERIFY_CORRECT_THREAD(); // ilog("received a vk message from ${peer}",("peer",originating_peer->get_remote_endpoint())); uint16_t n = vk_message_received.number -1; std::cout<<"received a vk message\n\n"; steem::plugins::group_signature::group_signature_plugin& pp = appbase::app().get_plugin<steem::plugins::group_signature::group_signature_plugin>(); if(pp.my->g_alpha_i_string_received[n] == "" || pp.my->g_alpha_i_string_received[n] != vk_message_received.vk){ std::cout<<"ok\n\n"; pp.my->g_alpha_i_string_received[n] = vk_message_received.vk; pp.my->g_alphaGen(); } else{ std::cout<<"this vk message is already received\n\n"; } }
这三个函数都需要在node.cpp中进行声明
void on_vss_message( peer_connection* originating_peer,const vss_message& vss_message_received); void send_vss_message(const peer_connection_ptr& peer); void on_vk_message(peer_connection* originating_peer,const vk_message& vk_message_received);
在on_message函数的switch-case语句中,增加vss_message和vk_message的处理函数:
case core_message_type_enum::vk_message_type: on_vk_message(originating_peer, received_message.as<vk_message>()); break; case core_message_type_enum::vss_message_type: on_vss_message(originating_peer, received_message.as<vss_message>()); break;
vk_messagen不是对某个特定的节点发送,是对所有的节点进行广播,因此,我们定义一个vk的广播函数,以及可以触发广播的函数
void node_impl::broadcast_vk_message_loop() { const steem::plugins::group_signature::group_signature_plugin& pp = appbase::app().get_plugin<steem::plugins::group_signature::group_signature_plugin>(); if(pp.my->vk_string != ""){ //广播vk std::cout<<"broadcast vk\n\n"; broadcast(vk_message(pp.my->vk_string, _group_number)); } if(!_node_is_shutting_down&&!_broadcast_vk_message_loop_done.canceled()) { _broadcast_vk_message_loop_done = schedule_task( [this](){broadcast_vk_message_loop();}, fc::time_point::now() + fc::minutes(GRAPHENE_NET_BROADCAST_VK_MESSAGE_INTERVAL_MINUTES), "broadcast vk message loop"); } } void node_impl::trigger_broadcast_vk_message_loop() { VERIFY_CORRECT_THREAD(); if( _retrigger_broadcast_vk_message_loop_promise ) _retrigger_broadcast_vk_message_loop_promise->set_value(); } fc::promise<void>::ptr _retrigger_broadcast_vk_message_loop_promise; fc::future<void> _broadcast_vk_message_loop_done;
相关文章
- 硅谷互联网公司的开发流程
开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro
- RESTful-表述性状态转移风格
REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表->就是产品资源 最重要的是如何表示一个资源 地址即
- 稳定性思考
产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能
- Supervisor守护队列发邮件
安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et
- 安装libsodium,让服务器支持chacha20等加密方式
用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod
随机推荐
- 硅谷互联网公司的开发流程
开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro
- RESTful-表述性状态转移风格
REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表->就是产品资源 最重要的是如何表示一个资源 地址即
- 稳定性思考
产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能
- Supervisor守护队列发邮件
安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et
- 安装libsodium,让服务器支持chacha20等加密方式
用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod