1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use std::borrow::Cow;
use crate::graph::cryptograph::{Edge, Vertex};
use crate::net::PersistentGraph;
use libp2p::kad::record::Key;
use libp2p::kad::store::Error as StoreError;
use libp2p::kad::store::RecordStore;
use libp2p::kad::{ProviderRecord, Record};
use libp2p::PeerId;
pub struct GlycosRecordStore {
graph: PersistentGraph,
}
impl GlycosRecordStore {
pub fn new(graph: PersistentGraph, _local_peer_id: PeerId) -> Self {
GlycosRecordStore { graph }
}
}
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub enum GlycosRecord {
PutVertex(Vertex),
PutEdge(Edge),
FullRecord(Vertex, Vec<Edge>),
}
impl GlycosRecord {
pub fn to_record(&self) -> Record {
Record::new(
Key::new(&self.identifier()),
bincode::serialize(&self).expect("infallible encoding"),
)
}
pub fn identifier(&self) -> [u8; 32] {
match self {
Self::PutVertex(v) => v.identifier(),
Self::PutEdge(e) => e.vertex_keys().next().unwrap(),
Self::FullRecord(v, e) => {
debug_assert!(e.iter().all(|x| x.verify_is_subject(&v)));
v.identifier()
}
}
}
}
fn map_store_error(e: impl std::error::Error) -> StoreError {
log::warn!("Store error; returning ValueTooLarge: {}", e);
StoreError::ValueTooLarge
}
impl<'a> RecordStore<'a> for GlycosRecordStore {
type RecordsIter = Box<dyn Iterator<Item = Cow<'a, Record>> + 'a>;
type ProvidedIter = Box<dyn Iterator<Item = Cow<'a, ProviderRecord>> + 'a>;
fn get(&'a self, k: &Key) -> Option<Cow<'_, Record>> {
let record = self.graph.find_vertex(&k);
let (vertex, edges) = match record {
Ok((v, e)) => (v?, e?),
Err(e) => {
log::error!("Error fetching vertex from storage: {:?}", e);
return None;
}
};
let edges: Result<Vec<_>, _> = edges.collect();
let edges = match edges {
Ok(v) => v,
Err(e) => {
log::error!("Error fetching edges from storage: {:?}", e);
return None;
}
};
Some(Cow::Owned(Record::new(
Key::new(&vertex.identifier()),
bincode::serialize(&(vertex, edges)).expect("infallible encoding"),
)))
}
fn put(&'a mut self, r: Record) -> Result<(), StoreError> {
let g: GlycosRecord = bincode::deserialize(&r.value).map_err(map_store_error)?;
match g {
GlycosRecord::PutVertex(new_vertex) => {
if !new_vertex.verify_signature() {
return Err(StoreError::ValueTooLarge);
}
if let Ok((Some(existing_vertex), _e)) =
self.graph.find_vertex(new_vertex.identifier())
{
if existing_vertex.clock >= new_vertex.clock {
log::warn!("Not overwriting older vertex.");
return Ok(());
}
}
self.graph
.persist_vertex(&new_vertex)
.map_err(map_store_error)
}
GlycosRecord::PutEdge(e) => self.graph.persist_edge(&e).map_err(map_store_error),
GlycosRecord::FullRecord(new_vertex, edges) => {
if !new_vertex.verify_signature() {
return Err(StoreError::ValueTooLarge);
}
if let Ok((Some(existing_vertex), _e)) =
self.graph.find_vertex(new_vertex.identifier())
{
if existing_vertex.clock >= new_vertex.clock {
log::warn!("Not overwriting older vertex.");
return Ok(());
}
}
self.graph
.persist_vertex(&new_vertex)
.map_err(map_store_error)?;
for e in &edges {
self.graph.persist_edge(&e).map_err(map_store_error)?;
}
Ok(())
}
}
}
fn remove(&'a mut self, _k: &Key) {
todo!()
}
fn records(&'a self) -> Self::RecordsIter {
Box::new(self.graph.vertices().map(move |(v, edges)| {
Cow::Owned(GlycosRecord::FullRecord(v, edges.collect()).to_record())
}))
}
fn add_provider(&'a mut self, _record: ProviderRecord) -> Result<(), StoreError> {
todo!()
}
fn providers(&'a self, _key: &Key) -> Vec<ProviderRecord> {
todo!()
}
fn provided(&'a self) -> Self::ProvidedIter {
todo!()
}
fn remove_provider(&'a mut self, _k: &Key, _p: &PeerId) {
todo!()
}
}