1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use futures::{future::FutureExt as _, prelude::*, ready, stream::Stream};
use futures_timer::Delay;
use log::debug;
use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use std::{
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use super::gossip::{GossipMessage, NeighborPacket};
use sc_network::PeerId;
use sp_runtime::traits::{Block as BlockT, NumberFor};
const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
#[derive(Clone)]
pub(super) struct NeighborPacketSender<B: BlockT>(
TracingUnboundedSender<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
);
impl<B: BlockT> NeighborPacketSender<B> {
pub fn send(
&self,
who: Vec<sc_network::PeerId>,
neighbor_packet: NeighborPacket<NumberFor<B>>,
) {
if let Err(err) = self.0.unbounded_send((who, neighbor_packet)) {
debug!(target: "afg", "Failed to send neighbor packet: {:?}", err);
}
}
}
pub(super) struct NeighborPacketWorker<B: BlockT> {
last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
delay: Delay,
rx: TracingUnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
}
impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}
impl<B: BlockT> NeighborPacketWorker<B> {
pub(super) fn new() -> (Self, NeighborPacketSender<B>) {
let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>(
"mpsc_grandpa_neighbor_packet_worker",
);
let delay = Delay::new(REBROADCAST_AFTER);
(NeighborPacketWorker { last: None, delay, rx }, NeighborPacketSender(tx))
}
}
impl<B: BlockT> Stream for NeighborPacketWorker<B> {
type Item = (Vec<PeerId>, GossipMessage<B>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = &mut *self;
match this.rx.poll_next_unpin(cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some((to, packet))) => {
this.delay.reset(REBROADCAST_AFTER);
this.last = Some((to.clone(), packet.clone()));
return Poll::Ready(Some((to, GossipMessage::<B>::from(packet))))
},
Poll::Pending => {},
};
ready!(this.delay.poll_unpin(cx));
this.delay.reset(REBROADCAST_AFTER);
while let Poll::Ready(()) = this.delay.poll_unpin(cx) {}
if let Some((ref to, ref packet)) = this.last {
return Poll::Ready(Some((to.clone(), GossipMessage::<B>::from(packet.clone()))))
}
Poll::Pending
}
}