1#[cfg(feature = "server")]
7pub struct PeerConnection {
9 pub sender: tokio::sync::mpsc::Sender<Vec<u8>>,
11}
12
13#[cfg(feature = "server")]
14pub struct NetworkManager {
16 pub nb_active_connections: u16,
18 pub connection_pool: std::collections::HashMap<std::net::SocketAddr, PeerConnection>,
20}
21
22#[cfg(feature = "server")]
23impl NetworkManager {
24 pub fn new() -> Self {
26 Self {
27 nb_active_connections: 0,
28 connection_pool: std::collections::HashMap::new(),
29 }
30 }
31
32 fn add_connection(
34 &mut self,
35 site_addr: std::net::SocketAddr,
36 sender: tokio::sync::mpsc::Sender<Vec<u8>>,
37 ) {
38 self.connection_pool
39 .insert(site_addr, PeerConnection { sender });
40 self.nb_active_connections += 1;
41 }
42
43 pub async fn create_connection(
45 &mut self,
46 site_addr: std::net::SocketAddr,
47 ) -> Result<(), Box<dyn std::error::Error>> {
48 use tokio::net::TcpStream;
49 use tokio::sync::mpsc;
50
51 let stream = TcpStream::connect(site_addr).await?;
52 let (tx, rx) = mpsc::channel(256);
53 spawn_writer_task(stream, rx).await;
54 self.add_connection(site_addr, tx);
55 Ok(())
56 }
57
58 pub fn remove_connection(&mut self, site_addr: &std::net::SocketAddr) {
60 self.connection_pool.remove(site_addr);
61 }
62
63 pub fn get_sender(
65 &self,
66 addr: &std::net::SocketAddr,
67 ) -> Option<tokio::sync::mpsc::Sender<Vec<u8>>> {
68 self.connection_pool.get(addr).map(|p| p.sender.clone())
69 }
70}
71
72#[cfg(feature = "server")]
73lazy_static::lazy_static! {
74 pub static ref NETWORK_MANAGER: std::sync::Arc<tokio::sync::Mutex<NetworkManager>> =
75 std::sync::Arc::new(tokio::sync::Mutex::new(NetworkManager::new()));
76}
77
78#[cfg(feature = "server")]
79pub async fn spawn_writer_task(
81 stream: tokio::net::TcpStream,
82 mut rx: tokio::sync::mpsc::Receiver<Vec<u8>>,
83) {
84 use tokio::io::AsyncWriteExt;
85
86 tokio::spawn(async move {
87 let mut stream = stream;
88 while let Some(data) = rx.recv().await {
89 if stream.write_all(&data).await.is_err() {
90 log::error!("Failed to send message");
91 break;
92 }
93 }
94 log::debug!("Writer task closed.");
95 });
96}
97
98#[cfg(feature = "server")]
99pub async fn announce(ip: &str, start_port: u16, end_port: u16, selected_port: u16) {
103 use crate::message::{MessageInfo, NetworkMessageCode};
104 use crate::state::LOCAL_APP_STATE;
105
106 let (local_addr, site_id, clocks, cli_peers) = {
107 let state = LOCAL_APP_STATE.lock().await;
108 (
109 state.get_site_addr(),
110 state.get_site_id(),
111 state.get_clock(),
112 state.get_cli_peers_addrs(),
113 )
114 };
115
116 let peer_to_ping: Vec<std::net::SocketAddr> = if !cli_peers.is_empty() {
118 log::debug!("Manually connecting to peers based on args");
119 cli_peers
120 } else {
121 log::debug!("Looking for all ports to find potential peers");
122 (start_port..=end_port)
123 .filter(|&port| port != selected_port)
124 .map(|port| format!("{}:{}", ip, port).parse().unwrap())
125 .collect()
126 };
127
128 if peer_to_ping.is_empty() {
130 return;
131 } else {
132 let mut state = LOCAL_APP_STATE.lock().await;
133 state.init_sync(true); }
135
136 use std::sync::Arc;
137 use std::sync::atomic::{AtomicUsize, Ordering};
138
139 let success_count = Arc::new(AtomicUsize::new(0));
140
141 let mut handles = Vec::new();
143 for addr in peer_to_ping {
144 let site_id = site_id.clone();
145 let clocks = clocks.clone();
146 let local_addr = local_addr.clone();
147 let success_count = Arc::clone(&success_count);
148
149 let handle = tokio::spawn(async move {
150 let result = send_message(
151 addr,
152 MessageInfo::None,
153 None,
154 NetworkMessageCode::Discovery,
155 local_addr,
156 &site_id,
157 &site_id,
158 local_addr,
159 clocks,
160 )
161 .await;
162
163 if result.is_ok() {
164 success_count.fetch_add(1, Ordering::SeqCst);
165 }
166 });
167 handles.push(handle);
168 }
169
170 for handle in handles {
172 let _ = handle.await;
173 }
174
175 {
177 let mut state = LOCAL_APP_STATE.lock().await;
178 state.init_nb_first_attended_neighbours(success_count.load(Ordering::SeqCst) as i64);
179 }
180}
181
182#[cfg(feature = "server")]
183pub async fn start_listening(stream: tokio::net::TcpStream, addr: std::net::SocketAddr) {
185 log::debug!("Accepted connection from: {}", addr);
186
187 tokio::spawn(async move {
188 if let Err(e) = handle_network_message(stream, addr).await {
189 log::error!("Error handling connection from {}: {}", addr, e);
190 }
191 });
192}
193
194#[cfg(feature = "server")]
195pub async fn handle_network_message(
198 mut stream: tokio::net::TcpStream,
199 socket_of_the_sender: std::net::SocketAddr,
200) -> Result<(), Box<dyn std::error::Error>> {
201 use crate::message::{Message, MessageInfo, NetworkMessageCode};
202 use crate::state::LOCAL_APP_STATE;
203 use rmp_serde::decode;
204 use tokio::io::AsyncReadExt;
205
206 let mut buf = vec![0; 1024];
207 loop {
208 let n = stream.read(&mut buf).await?;
209
210 if n == 0 {
211 log::warn!("Connection closed by: {}", socket_of_the_sender);
212 {
214 log::debug!("Removing {} from the peers", socket_of_the_sender);
215 let mut state = LOCAL_APP_STATE.lock().await;
216 state
217 .remove_peer_from_socket_closed(socket_of_the_sender)
218 .await;
219 }
220 return Ok(());
221 }
222
223 log::debug!("Received {} bytes from {}", n, socket_of_the_sender);
224
225 let message: Message = match decode::from_slice(&buf[..n]) {
226 Ok(msg) => msg,
227 Err(e) => {
228 log::error!("Error decoding message: {}", e);
229 continue;
230 }
231 };
232
233 log::debug!(
234 "Message received from site {} : {:?}",
235 message.sender_addr,
236 message.clone()
237 );
238
239 {
240 let mut state = LOCAL_APP_STATE.lock().await;
241 state.add_site_id(
242 message.message_initiator_id.clone(),
243 message.message_initiator_addr.clone(),
244 );
245 }
246
247 match message.code {
248 NetworkMessageCode::AcquireMutex => {
249 {
251 let mut st = LOCAL_APP_STATE.lock().await;
252 st.global_mutex_fifo.insert(
253 message.message_initiator_id.clone(),
254 crate::state::MutexStamp {
255 tag: crate::state::MutexTag::Request,
256 date: message.clock.get_lamport().clone(),
257 },
258 );
259 }
260 let mut diffuse = false;
262 let (local_site_id, local_site_addr) = {
263 let mut state = LOCAL_APP_STATE.lock().await;
264 let parent_id = state
265 .parent_addr_for_transaction_wave
266 .get(&message.message_initiator_id)
267 .unwrap_or(&"0.0.0.0:0".parse().unwrap())
268 .to_string();
269 if parent_id == "0.0.0.0:0" {
270 state.set_parent_addr(
271 message.message_initiator_id.clone(),
272 message.sender_addr,
273 );
274
275 let nb_neighbours = state.get_nb_connected_neighbours();
276 let current_value = state
277 .attended_neighbours_nb_for_transaction_wave
278 .get(&message.message_initiator_id)
279 .copied()
280 .unwrap_or(nb_neighbours);
281
282 state
283 .attended_neighbours_nb_for_transaction_wave
284 .insert(message.message_initiator_id.clone(), current_value - 1);
285
286 log::debug!("Nombre de voisin : {}", current_value - 1);
287
288 diffuse = state
289 .attended_neighbours_nb_for_transaction_wave
290 .get(&message.message_initiator_id)
291 .copied()
292 .unwrap_or(0)
293 > 0;
294 }
295 (state.get_site_id(), state.get_site_addr())
296 };
297
298 if diffuse {
299 let mut snd_msg = message.clone();
300 snd_msg.sender_id = local_site_id.to_string();
301 snd_msg.sender_addr = local_site_addr;
302 diffuse_message(&snd_msg).await?;
303 } else {
304 let (parent_addr, local_addr, site_id) = {
305 let state = LOCAL_APP_STATE.lock().await;
306 (
307 state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
308 &state.get_site_addr(),
309 &state.get_site_id().to_string(),
310 )
311 };
312 log::debug!(
314 "Réception d'un message de d'acquisition de mutex, on est sur une feuille, on acquite, envoie à {}",
315 message.sender_addr.to_string().as_str()
316 );
317 send_message(
318 message.sender_addr,
319 MessageInfo::AckMutex(crate::message::AckMutexPayload {
320 clock: message.clock.get_lamport().clone(),
321 }),
322 None,
323 NetworkMessageCode::AckGlobalMutex,
324 *local_addr,
325 site_id,
326 &message.message_initiator_id,
327 message.message_initiator_addr,
328 message.clock.clone(),
329 )
330 .await?;
331
332 if message.sender_addr == parent_addr {
333 let mut state = LOCAL_APP_STATE.lock().await;
335 let peer_count = state.get_nb_connected_neighbours();
336 state
337 .attended_neighbours_nb_for_transaction_wave
338 .insert(message.message_initiator_id.clone(), peer_count as i64);
339 state.parent_addr_for_transaction_wave.insert(
340 message.message_initiator_id.clone(),
341 "0.0.0.0:0".parse().unwrap(),
342 );
343 }
344 }
345 }
346
347 NetworkMessageCode::AckGlobalMutex => {
348 let mut state = LOCAL_APP_STATE.lock().await;
350
351 let nb_neighbours = state.get_nb_connected_neighbours();
352 let current_value = state
353 .attended_neighbours_nb_for_transaction_wave
354 .get(&message.message_initiator_id)
355 .copied()
356 .unwrap_or(nb_neighbours);
357 state
358 .attended_neighbours_nb_for_transaction_wave
359 .insert(message.message_initiator_id.clone(), current_value - 1);
360
361 if state
362 .attended_neighbours_nb_for_transaction_wave
363 .get(&message.message_initiator_id.clone())
364 .copied()
365 .unwrap_or(-1)
366 == 0
367 {
368 if state
369 .parent_addr_for_transaction_wave
370 .get(&message.message_initiator_id.clone())
371 .copied()
372 .unwrap_or("99.99.99.99:0".parse().unwrap())
373 == state.get_site_addr()
374 {
375 println!("\x1b[1;31mDiffusion terminée et réussie !\x1b[0m");
380 state.try_enter_sc();
381 } else {
382 log::debug!(
383 "On est de le noeud {}. On a reçu un rouge de tous nos fils: on acquite au parent {}",
384 state.get_site_addr(),
385 state
386 .get_parent_addr_for_wave(message.message_initiator_id.clone())
387 .to_string()
388 .as_str()
389 );
390 send_message(
391 state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
392 MessageInfo::AckMutex(crate::message::AckMutexPayload {
393 clock: message.clock.get_lamport().clone(),
394 }),
395 None,
396 NetworkMessageCode::AckGlobalMutex,
397 state.get_site_addr(),
398 &state.get_site_id().to_string(),
399 &message.message_initiator_id,
400 message.message_initiator_addr,
401 state.get_clock().clone(),
402 )
403 .await?;
404 }
405
406 let peer_count = state.get_nb_connected_neighbours();
407 state
408 .attended_neighbours_nb_for_transaction_wave
409 .insert(message.message_initiator_id.clone(), peer_count as i64);
410 state.parent_addr_for_transaction_wave.insert(
411 message.message_initiator_id.clone(),
412 "0.0.0.0:0".parse().unwrap(),
413 );
414 }
415 }
416
417 NetworkMessageCode::AckReleaseGlobalMutex => {
418 let mut state = LOCAL_APP_STATE.lock().await;
420
421 let nb_neighbours = state.get_nb_connected_neighbours();
422 let current_value = state
423 .attended_neighbours_nb_for_transaction_wave
424 .get(&message.message_initiator_id)
425 .copied()
426 .unwrap_or(nb_neighbours);
427 state
428 .attended_neighbours_nb_for_transaction_wave
429 .insert(message.message_initiator_id.clone(), current_value - 1);
430
431 if state
432 .attended_neighbours_nb_for_transaction_wave
433 .get(&message.message_initiator_id.clone())
434 .copied()
435 .unwrap_or(-1)
436 == 0
437 {
438 if state
439 .parent_addr_for_transaction_wave
440 .get(&message.message_initiator_id.clone())
441 .copied()
442 .unwrap_or("99.99.99.99:0".parse().unwrap())
443 == state.get_site_addr()
444 {
445 println!("\x1b[1;31mDiffusion terminée et réussie !\x1b[0m");
450 state.try_enter_sc();
452 } else {
453 log::debug!(
454 "On est de le noeud {}. On a reçu un rouge de tous nos fils: on acquite au parent {}",
455 state.get_site_addr(),
456 state
457 .get_parent_addr_for_wave(message.message_initiator_id.clone())
458 .to_string()
459 .as_str()
460 );
461 send_message(
462 state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
463 MessageInfo::None,
464 None,
465 NetworkMessageCode::AckReleaseGlobalMutex,
466 state.get_site_addr(),
467 &state.get_site_id().to_string(),
468 &message.message_initiator_id,
469 message.message_initiator_addr,
470 state.get_clock().clone(),
471 )
472 .await?;
473 }
474
475 let peer_count = state.get_nb_connected_neighbours();
476 state
477 .attended_neighbours_nb_for_transaction_wave
478 .insert(message.message_initiator_id.clone(), peer_count as i64);
479 state.parent_addr_for_transaction_wave.insert(
480 message.message_initiator_id.clone(),
481 "0.0.0.0:0".parse().unwrap(),
482 );
483 }
484 }
485
486 NetworkMessageCode::ReleaseGlobalMutex => {
487 {
489 let mut st = LOCAL_APP_STATE.lock().await;
490 st.global_mutex_fifo.remove(&message.message_initiator_id);
491 st.try_enter_sc();
492 }
493 let mut diffuse = false;
495 let (local_site_id, local_site_addr) = {
496 let mut state = LOCAL_APP_STATE.lock().await;
497 let parent_id = state
498 .parent_addr_for_transaction_wave
499 .get(&message.message_initiator_id)
500 .unwrap_or(&"0.0.0.0:0".parse().unwrap())
501 .to_string();
502 if parent_id == "0.0.0.0:0" {
503 state.set_parent_addr(
504 message.message_initiator_id.clone(),
505 message.sender_addr,
506 );
507
508 let nb_neighbours = state.get_nb_connected_neighbours();
509 let current_value = state
510 .attended_neighbours_nb_for_transaction_wave
511 .get(&message.message_initiator_id)
512 .copied()
513 .unwrap_or(nb_neighbours);
514
515 state
516 .attended_neighbours_nb_for_transaction_wave
517 .insert(message.message_initiator_id.clone(), current_value - 1);
518
519 log::debug!("Nombre de voisin : {}", current_value - 1);
520
521 diffuse = state
522 .attended_neighbours_nb_for_transaction_wave
523 .get(&message.message_initiator_id)
524 .copied()
525 .unwrap_or(0)
526 > 0;
527 }
528 (state.get_site_id(), state.get_site_addr())
529 };
530
531 if diffuse {
532 let mut snd_msg = message.clone();
533 snd_msg.sender_id = local_site_id.to_string();
534 snd_msg.sender_addr = local_site_addr;
535 diffuse_message(&snd_msg).await?;
536 } else {
537 let (parent_addr, local_addr, site_id) = {
538 let state = LOCAL_APP_STATE.lock().await;
539 (
540 state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
541 &state.get_site_addr(),
542 &state.get_site_id().to_string(),
543 )
544 };
545 log::debug!(
547 "Réception d'un message de relachement de mutex global, on est sur une feuille, on acquite, envoie à {}",
548 message.sender_addr.to_string().as_str()
549 );
550 send_message(
551 message.sender_addr,
552 MessageInfo::None,
553 None,
554 NetworkMessageCode::AckReleaseGlobalMutex,
555 *local_addr,
556 site_id,
557 &message.message_initiator_id,
558 message.message_initiator_addr,
559 message.clock.clone(),
560 )
561 .await?;
562
563 if message.sender_addr == parent_addr {
564 let mut state = LOCAL_APP_STATE.lock().await;
566 let peer_count = state.get_nb_connected_neighbours();
567 state
568 .attended_neighbours_nb_for_transaction_wave
569 .insert(message.message_initiator_id.clone(), peer_count as i64);
570 state.parent_addr_for_transaction_wave.insert(
571 message.message_initiator_id.clone(),
572 "0.0.0.0:0".parse().unwrap(),
573 );
574 }
575 }
576 }
577
578 NetworkMessageCode::Discovery => {
579 let mut state = LOCAL_APP_STATE.lock().await;
580
581 state.add_incomming_peer(
583 message.message_initiator_addr,
584 socket_of_the_sender,
585 message.clock.clone(),
586 );
587
588 if state
590 .get_connected_nei_addr()
591 .iter()
592 .find(|addr| addr == &&message.sender_addr)
593 .is_some()
594 {
595 if state
596 .get_connected_nei_addr()
597 .iter()
598 .find(|addr| addr == &&message.sender_addr)
599 .is_none()
600 {
601 state.add_connected_neighbour(message.sender_addr);
602 }
603 send_message(
604 message.sender_addr,
605 MessageInfo::Acknowledge(crate::message::AcknowledgePayload {
606 global_fifo: state.get_global_mutex_fifo().clone(),
607 }),
608 None,
609 NetworkMessageCode::Acknowledgment,
610 state.get_site_addr(),
611 state.get_site_id().as_str(),
612 &message.message_initiator_id.clone(),
613 message.message_initiator_addr,
614 state.get_clock(),
615 )
616 .await?;
617 }
618 }
619
620 NetworkMessageCode::Acknowledgment => {
621 let ready_to_sync = {
622 let mut state = LOCAL_APP_STATE.lock().await;
623 state.add_incomming_peer(
626 message.sender_addr,
627 socket_of_the_sender,
628 message.clock.clone(),
629 );
630 if message.message_initiator_addr == state.get_site_addr() {
631 for (site_id, nb_a_i) in state.get_nb_nei_for_wave().iter() {
632 state
633 .attended_neighbours_nb_for_transaction_wave
634 .insert(site_id.clone(), *nb_a_i + 1);
635 }
636 }
637 state.get_sync()
641 && state.get_nb_first_attended_neighbours()
642 == state.get_nb_connected_neighbours()
643 };
644
645 let global_fifo = match &message.info {
647 MessageInfo::Acknowledge(payload) => Some(payload.global_fifo.clone()),
648 _ => None,
649 };
650
651 if let Some(global_fifo) = global_fifo {
652 let mut state = LOCAL_APP_STATE.lock().await;
653 state.set_global_mutex_fifo(global_fifo);
654 }
655
656 if ready_to_sync {
657 log::info!("All neighbours have responded, starting synchronization");
658 crate::control::enqueue_critical(
659 crate::control::CriticalCommands::SyncSnapshot,
660 )
661 .await?;
662 }
663 }
664
665 NetworkMessageCode::Transaction => {
666 if message.command.is_some() {
668 if let Err(e) = crate::control::process_network_command(
669 message.info.clone(),
670 message.clock.clone(),
671 message.message_initiator_id.as_str(),
672 )
673 .await
674 {
675 log::error!("Error handling command:\n{}", e);
676 }
677 let mut diffuse = false;
679 let (local_site_id, local_site_addr) = {
680 let mut state = LOCAL_APP_STATE.lock().await;
681 let parent_id = state
682 .parent_addr_for_transaction_wave
683 .get(&message.message_initiator_id.clone())
684 .unwrap_or(&"0.0.0.0:0".parse().unwrap())
685 .to_string();
686 if parent_id == "0.0.0.0:0" {
687 state.set_parent_addr(
688 message.message_initiator_id.clone(),
689 message.sender_addr,
690 );
691
692 let nb_neighbours = state.get_nb_connected_neighbours();
693 let current_value = state
694 .attended_neighbours_nb_for_transaction_wave
695 .get(&message.message_initiator_id)
696 .copied()
697 .unwrap_or(nb_neighbours);
698
699 state
700 .attended_neighbours_nb_for_transaction_wave
701 .insert(message.message_initiator_id.clone(), current_value - 1);
702
703 log::debug!("Nombre de voisin : {}", current_value - 1);
704
705 diffuse = state
706 .attended_neighbours_nb_for_transaction_wave
707 .get(&message.message_initiator_id.clone())
708 .copied()
709 .unwrap_or(0)
710 > 0;
711 }
712 (state.get_site_id(), state.get_site_addr())
713 };
714
715 if diffuse {
716 let mut snd_msg = message.clone();
717 snd_msg.sender_id = local_site_id.to_string();
718 snd_msg.sender_addr = local_site_addr;
719 diffuse_message(&snd_msg).await?;
720 } else {
721 let (parent_addr, local_addr, site_id) = {
722 let state = LOCAL_APP_STATE.lock().await;
723 (
724 state
725 .get_parent_addr_for_wave(message.message_initiator_id.clone()),
726 state.get_site_addr(),
727 state.get_site_id().to_string(),
728 )
729 };
730 log::debug!(
732 "Réception d'un message de transaction, on est sur une feuille, on acquite, envoie à {}",
733 message.sender_addr.to_string().as_str()
734 );
735 send_message(
736 message.sender_addr,
737 MessageInfo::None,
738 None,
739 NetworkMessageCode::TransactionAcknowledgement,
740 local_addr,
741 site_id.as_str(),
742 &message.message_initiator_id.clone(),
743 message.message_initiator_addr,
744 message.clock.clone(),
745 )
746 .await?;
747
748 if message.sender_addr == parent_addr {
749 let mut state = LOCAL_APP_STATE.lock().await;
751 let peer_count = state.get_nb_connected_neighbours();
752 state
753 .attended_neighbours_nb_for_transaction_wave
754 .insert(message.message_initiator_id.clone(), peer_count as i64);
755 state
756 .parent_addr_for_transaction_wave
757 .insert(message.message_initiator_id, "0.0.0.0:0".parse().unwrap());
758 }
759 }
760 } else {
761 log::error!("Command is None for Transaction message");
762 }
763 }
764 NetworkMessageCode::TransactionAcknowledgement => {
765 let mut should_reset = false;
766
767 let mut state = LOCAL_APP_STATE.lock().await;
769
770 let nb_neighbours = state.get_nb_connected_neighbours();
771 let current_value = state
772 .attended_neighbours_nb_for_transaction_wave
773 .get(&message.message_initiator_id)
774 .copied()
775 .unwrap_or(nb_neighbours);
776 state
777 .attended_neighbours_nb_for_transaction_wave
778 .insert(message.message_initiator_id.clone(), current_value - 1);
779
780 if state
781 .attended_neighbours_nb_for_transaction_wave
782 .get(&message.message_initiator_id)
783 .copied()
784 .unwrap_or(-1)
785 == 0
786 {
787 if state
788 .parent_addr_for_transaction_wave
789 .get(&message.message_initiator_id)
790 .copied()
791 .unwrap_or("99.99.99.99:0".parse().unwrap())
792 == state.get_site_addr()
793 {
794 println!("\x1b[1;31mDiffusion terminée et réussie !\x1b[0m");
799 should_reset = true;
800 } else {
801 log::debug!(
802 "On est dans le noeud {}. On a reçu un rouge de tous nos fils: on acquite au parent {}",
803 state.get_site_addr().to_string().as_str(),
804 state
805 .get_parent_addr_for_wave(message.message_initiator_id.clone())
806 .to_string()
807 .as_str()
808 );
809 send_message(
810 state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
811 MessageInfo::None,
812 None,
813 NetworkMessageCode::TransactionAcknowledgement,
814 state.get_site_addr(),
815 &state.get_site_id().to_string(),
816 &message.message_initiator_id,
817 message.message_initiator_addr,
818 state.get_clock(),
819 )
820 .await?;
821 }
822
823 let peer_count = state.get_nb_connected_neighbours();
824 state
825 .attended_neighbours_nb_for_transaction_wave
826 .insert(message.message_initiator_id.clone(), peer_count as i64);
827 state
828 .parent_addr_for_transaction_wave
829 .insert(message.message_initiator_id, "0.0.0.0:0".parse().unwrap());
830
831 if should_reset && state.pending_commands.len() == 0 {
832 state.release_mutex().await?;
834 };
835 }
836 }
837
838 NetworkMessageCode::Error => {
839 log::debug!("Error message received: {:?}", message);
840 }
841 NetworkMessageCode::Disconnect => {
842 {
843 let mut state = LOCAL_APP_STATE.lock().await;
844 state.remove_peer(message.message_initiator_addr).await;
845 }
846 println!(
847 "\x1b[1;31mSITE {} DISCONNECTED !\x1b[0m",
848 message.message_initiator_id
849 );
850 }
851 NetworkMessageCode::SnapshotRequest => {
852 let mut diffuse = false;
855 let (local_site_id, local_site_addr) = {
856 let mut state = LOCAL_APP_STATE.lock().await;
857 let parent_id = state
858 .parent_addr_for_transaction_wave
859 .get(&message.message_initiator_id.clone())
860 .unwrap_or(&"0.0.0.0:0".parse().unwrap())
861 .to_string();
862 if parent_id == "0.0.0.0:0" {
863 state.set_parent_addr(
864 message.message_initiator_id.clone(),
865 message.sender_addr,
866 );
867
868 let nb_neighbours = state.get_nb_connected_neighbours();
869 let current_value = state
870 .attended_neighbours_nb_for_transaction_wave
871 .get(&message.message_initiator_id)
872 .copied()
873 .unwrap_or(nb_neighbours);
874
875 state
876 .attended_neighbours_nb_for_transaction_wave
877 .insert(message.message_initiator_id.clone(), current_value - 1);
878
879 log::debug!("Nombre de voisin : {}", current_value - 1);
880
881 diffuse = state
882 .attended_neighbours_nb_for_transaction_wave
883 .get(&message.message_initiator_id.clone())
884 .copied()
885 .unwrap_or(0)
886 > 0;
887 }
888 (state.get_site_id(), state.get_site_addr())
889 };
890
891 if diffuse {
892 let mut snd_msg = message.clone();
893 snd_msg.sender_id = local_site_id.to_string();
894 snd_msg.sender_addr = local_site_addr;
895 log::debug!(
899 "We are not on a leaf, we start our own global snapshot construction and diffuse the request to other nodes"
900 );
901 crate::snapshot::start_snapshot(crate::snapshot::SnapshotMode::NetworkMode)
902 .await?;
903 diffuse_message(&snd_msg).await?;
905 } else {
906 let parent_addr = {
907 let state = LOCAL_APP_STATE.lock().await;
908 state.get_parent_addr_for_wave(message.message_initiator_id.clone())
909 };
910 log::debug!(
912 "Réception d'une demande de snapshot, on est sur une feuille, on crée un snapshot local, on envoie à {}",
913 message.sender_addr.to_string().as_str()
914 );
915 let txs = crate::db::get_local_transaction_log()?;
917 let summaries: Vec<_> = txs.iter().map(|t| t.into()).collect();
918
919 let (site_id, clock, local_addr) = {
920 let st = LOCAL_APP_STATE.lock().await;
921 (st.get_site_id(), st.get_clock(), st.get_site_addr())
922 };
923
924 send_message(
925 message.sender_addr,
926 MessageInfo::SnapshotResponse(crate::message::SnapshotResponse {
927 site_id: site_id.clone(),
928 clock: clock.clone(),
929 tx_log: summaries,
930 }),
931 None,
932 NetworkMessageCode::SnapshotResponse,
933 local_addr,
934 &site_id,
935 &message.message_initiator_id,
936 message.message_initiator_addr,
937 clock.clone(),
938 )
939 .await?;
940
941 if message.sender_addr == parent_addr {
942 let mut state = LOCAL_APP_STATE.lock().await;
944 let peer_count = state.get_nb_connected_neighbours();
945 state
946 .attended_neighbours_nb_for_transaction_wave
947 .insert(message.message_initiator_id.clone(), peer_count as i64);
948 state
949 .parent_addr_for_transaction_wave
950 .insert(message.message_initiator_id, "0.0.0.0:0".parse().unwrap());
951 }
952 }
953 }
954 NetworkMessageCode::SnapshotResponse => {
955 let mut should_reset = false;
957 let mut state = LOCAL_APP_STATE.lock().await;
958
959 let nb_neighbours = state.get_nb_connected_neighbours();
960 let current_value = state
961 .attended_neighbours_nb_for_transaction_wave
962 .get(&message.message_initiator_id)
963 .copied()
964 .unwrap_or(nb_neighbours);
965 state
966 .attended_neighbours_nb_for_transaction_wave
967 .insert(message.message_initiator_id.clone(), current_value - 1);
968
969 if state
970 .attended_neighbours_nb_for_transaction_wave
971 .get(&message.message_initiator_id)
972 .copied()
973 .unwrap_or(-1)
974 == 0
975 {
976 if state
977 .parent_addr_for_transaction_wave
978 .get(&message.message_initiator_id)
979 .copied()
980 .unwrap_or("99.99.99.99:0".parse().unwrap())
981 == state.get_site_addr()
982 {
983 log::debug!(
987 "L'initiateur à reçu toutes ses snapshots, il devrait créer sa snapshot globale"
988 );
989 if let MessageInfo::SnapshotResponse(resp) = message.info {
990 let mut mgr = crate::snapshot::LOCAL_SNAPSHOT_MANAGER.lock().await;
991 if mgr.mode == crate::snapshot::SnapshotMode::FileMode {
992 log::debug!("La snapshot devrait être sauvegardée");
993 if let Some(gs) = mgr.push(resp) {
994 log::info!(
995 "Global snapshot ready to save, hold per site : {:#?}",
996 gs.missing
997 );
998 mgr.path = crate::snapshot::persist(&gs, state.get_site_id())
999 .await
1000 .unwrap()
1001 .parse()
1002 .ok();
1003 }
1004 } else if mgr.mode == crate::snapshot::SnapshotMode::SyncMode {
1005 log::debug!(
1006 "La snapshot devrait être utilisée pour la synchronisation"
1007 );
1008 if let Some(gs) = mgr.push(resp) {
1009 log::info!(
1010 "Global snapshot ready to be synced, hold per site : {:#?}",
1011 gs.missing
1012 );
1013 crate::db::update_db_with_snapshot(
1014 &gs,
1015 state.get_clock().get_vector_clock_map(),
1016 );
1017 }
1018 }
1019 }
1020
1021 println!("\x1b[1;31mDiffusion terminée et réussie !\x1b[0m");
1022 should_reset = true;
1023 } else {
1024 log::debug!(
1025 "On est dans le noeud {}. On a reçu un rouge de tous nos fils: on acquite au parent {}",
1026 state.get_site_addr().to_string().as_str(),
1027 state
1028 .get_parent_addr_for_wave(message.message_initiator_id.clone())
1029 .to_string()
1030 .as_str()
1031 );
1032 log::debug!(
1033 "On devrait pouvoir construire une snapshot globale avec tous nos voisins et l'envoyer à l'adresse de notre parent {}",
1034 state
1035 .get_parent_addr_for_wave(message.message_initiator_id.clone())
1036 .to_string()
1037 .as_str()
1038 );
1039 if let MessageInfo::SnapshotResponse(resp) = message.info {
1040 let mut mgr = crate::snapshot::LOCAL_SNAPSHOT_MANAGER.lock().await;
1041 if mgr.mode == crate::snapshot::SnapshotMode::NetworkMode {
1042 log::debug!("La snapshot devrait être envoyés au père");
1043 if let Some(gs) = mgr.push(resp) {
1044 log::info!(
1045 "Global snapshot ready to be send to parent, hold per site : {:#?}",
1046 gs.missing
1047 );
1048 send_message(
1049 state.get_parent_addr_for_wave(
1050 message.message_initiator_id.clone(),
1051 ),
1052 MessageInfo::SnapshotResponse(
1053 crate::message::SnapshotResponse {
1054 site_id: state.get_site_id().to_string(),
1055 clock: state.get_clock(),
1056 tx_log: gs.all_transactions.into_iter().collect(),
1057 },
1058 ),
1059 None,
1060 NetworkMessageCode::SnapshotResponse,
1061 state.get_site_addr(),
1062 &state.get_site_id().to_string(),
1063 &message.message_initiator_id,
1064 message.message_initiator_addr,
1065 state.get_clock(),
1066 )
1067 .await?;
1068 } else {
1069 log::error!("Le site aurait du récupérer toutes ses snapshots");
1070 }
1071 }
1072 } else {
1073 log::error!("Message de type SnapshotResponse attendu, mais pas reçu");
1074 }
1075 }
1076
1077 let peer_count = state.get_nb_connected_neighbours();
1078 state
1079 .attended_neighbours_nb_for_transaction_wave
1080 .insert(message.message_initiator_id.clone(), peer_count as i64);
1081 state
1082 .parent_addr_for_transaction_wave
1083 .insert(message.message_initiator_id, "0.0.0.0:0".parse().unwrap());
1084 if should_reset && state.pending_commands.len() == 0 {
1085 state.release_mutex().await?;
1087 };
1088 } else {
1089 log::debug!(
1090 "On a reçu un message rouge d'un des fils mais la vague n'est pas encore terminée"
1091 );
1092 if let MessageInfo::SnapshotResponse(resp) = message.info {
1094 let mut mgr = crate::snapshot::LOCAL_SNAPSHOT_MANAGER.lock().await;
1095 log::debug!("La snapshot devrait être ajoutés à l'état du manager");
1096 if let Some(_) = mgr.push(resp) {
1097 log::error!(
1098 "On ne devrait pas encore pouvoir construire une snapshot globale vu que la vague n'est pas terminée"
1099 );
1100 }
1101 } else {
1102 log::error!("Message de type SnapshotResponse attendu, mais pas reçu");
1103 }
1104 }
1105 }
1106 }
1107
1108 let mut state = LOCAL_APP_STATE.lock().await;
1109 state.update_clock(Some(&message.clock.clone())).await;
1110 }
1111}
1112
1113#[cfg(feature = "server")]
1114pub async fn send_message(
1116 recipient_address: std::net::SocketAddr,
1117 info: crate::message::MessageInfo,
1118 command: Option<crate::control::Command>,
1119 code: crate::message::NetworkMessageCode,
1120 local_addr: std::net::SocketAddr,
1121 local_site: &str,
1122 initiator_id: &str,
1123 initiator_addr: std::net::SocketAddr,
1124 sender_clock: crate::clock::Clock,
1125) -> Result<(), Box<dyn std::error::Error>> {
1126 use crate::message::Message;
1127 use rmp_serde::encode;
1128
1129 if code == crate::message::NetworkMessageCode::Transaction && command.is_none() {
1130 log::error!("Command is None for Transaction message");
1131 return Err("Command is None for Transaction message".into());
1132 }
1133
1134 let msg = Message {
1135 sender_id: local_site.to_string(),
1136 sender_addr: local_addr,
1137 message_initiator_id: initiator_id.to_string(),
1138 clock: sender_clock,
1139 command,
1140 info,
1141 code,
1142 message_initiator_addr: initiator_addr,
1143 };
1144
1145 if recipient_address.ip().is_unspecified() || recipient_address.port() == 0 {
1146 log::warn!("Skipping invalid peer address {}", recipient_address);
1147 return Ok(());
1148 }
1149
1150 let buf = encode::to_vec(&msg)?;
1151
1152 let mut manager = NETWORK_MANAGER.lock().await;
1153
1154 let sender = match manager.get_sender(&recipient_address) {
1155 Some(s) => s,
1156 None => {
1157 if let Err(e) = manager.create_connection(recipient_address).await {
1158 return Err(format!(
1159 "error with connection to {}: {}",
1160 recipient_address.to_string(),
1161 e
1162 )
1163 .into());
1164 }
1165 match manager.get_sender(&recipient_address) {
1166 Some(s) => s,
1167 None => {
1168 let err_msg =
1169 format!("Sender not found after connecting to {}", recipient_address);
1170 log::error!("{}", err_msg);
1171 return Err(err_msg.into());
1172 }
1173 }
1174 }
1175 };
1176
1177 match sender.send(buf).await {
1178 Ok(s) => s,
1179 Err(e) => {
1180 let err_msg = format!(
1181 "Impossible to send msg to {} due to error : {}",
1182 recipient_address, e
1183 );
1184 log::error!("{}", err_msg);
1185 return Err(err_msg.into());
1186 }
1187 };
1188 log::debug!("Sent message {:?} to {}", &msg, recipient_address);
1189 Ok(())
1190}
1191
1192#[cfg(feature = "server")]
1193pub async fn diffuse_message(
1197 message: &crate::message::Message,
1198) -> Result<(), Box<dyn std::error::Error>> {
1199 use crate::state::LOCAL_APP_STATE;
1200
1201 log::debug!(
1202 "Début de la diffusion d'un message de type {:?}",
1203 message.code
1204 );
1205
1206 let (local_addr, site_id, connected_nei_addr, parent_address) = {
1207 let state = LOCAL_APP_STATE.lock().await;
1208 (
1209 state.get_site_addr(),
1210 state.get_site_id(),
1211 state.get_connected_nei_addr(),
1212 state.get_parent_addr_for_wave(message.message_initiator_id.clone()),
1213 )
1214 };
1215 diffuse_message_without_lock(
1216 message,
1217 local_addr,
1218 &site_id,
1219 connected_nei_addr,
1220 parent_address,
1221 )
1222 .await?;
1223 Ok(())
1224}
1225
1226#[cfg(feature = "server")]
1227pub async fn diffuse_message_without_lock(
1231 message: &crate::message::Message,
1232 local_addr: std::net::SocketAddr,
1233 site_id: &str,
1234 connected_nei_addr: Vec<std::net::SocketAddr>,
1235 parent_address: std::net::SocketAddr,
1236) -> Result<(), Box<dyn std::error::Error>> {
1237 for connected_nei in connected_nei_addr {
1238 let peer_addr_str = connected_nei.to_string();
1239 if connected_nei != parent_address {
1240 log::debug!("Sending message to: {}", peer_addr_str);
1241
1242 if let Err(e) = send_message(
1243 connected_nei,
1244 message.info.clone(),
1245 message.command.clone(),
1246 message.code.clone(),
1247 local_addr,
1248 &site_id,
1249 &message.message_initiator_id,
1250 message.message_initiator_addr,
1251 message.clock.clone(),
1252 )
1253 .await
1254 {
1255 log::error!("❌ Impossible d’envoyer à {} : {}", peer_addr_str, e);
1256 }
1257 }
1258 }
1259 Ok(())
1260}
1261
1262#[cfg(test)]
1263#[cfg(feature = "server")]
1264mod tests {
1265 use super::*;
1266 use tokio::net::TcpListener;
1267
1268 #[tokio::test]
1269 async fn test_send_message() -> Result<(), Box<dyn std::error::Error>> {
1270 use crate::clock::Clock;
1271 use crate::message::{MessageInfo, NetworkMessageCode};
1272
1273 let address: std::net::SocketAddr = "127.0.0.1:8081".parse().unwrap();
1274 let local_addr: std::net::SocketAddr = "127.0.0.1:8080".parse().unwrap();
1275 let local_site = "A";
1276 let clock = Clock::new();
1277
1278 let _listener = TcpListener::bind(address).await?;
1279
1280 let code = NetworkMessageCode::Discovery;
1281
1282 let send_result = send_message(
1283 address,
1284 MessageInfo::None,
1285 None,
1286 code,
1287 local_addr,
1288 local_site,
1289 local_site,
1290 local_addr,
1291 clock,
1292 )
1293 .await;
1294 assert!(send_result.is_ok());
1295 Ok(())
1296 }
1297}