1#![cfg(feature = "server")]
7pub fn control_worker() {
9 tokio::spawn(async {
10 use crate::state::LOCAL_APP_STATE;
11
12 loop {
13 let notify = {
15 let st = LOCAL_APP_STATE.lock().await;
16 st.notify_sc.clone()
17 };
18
19 notify.notified().await;
21
22 {
24 let (in_st, waiting, nb_pending) = {
25 let st = LOCAL_APP_STATE.lock().await;
26 (st.in_sc, st.waiting_sc, st.pending_commands.len())
27 };
28
29 if !waiting && nb_pending > 0 && !in_st {
30 let mut st = LOCAL_APP_STATE.lock().await;
31 let _ = st.acquire_mutex().await;
32 continue;
33 }
34
35 if in_st && nb_pending > 0 {
36 log::info!("Début de la section critique");
37 loop {
38 let cmd_opt = {
39 let mut st = LOCAL_APP_STATE.lock().await;
40 st.pending_commands.pop_front()
41 };
42 if let Some(cmd) = cmd_opt {
43 log::info!("Execute critical command");
44 if let Err(e) = crate::control::execute_critical(cmd).await {
45 log::error!("Erreur exécution commande critique : {}", e);
46 }
47 } else {
48 break;
49 }
50 }
51 log::info!("Fin de la section critique");
52 }
53 }
54 }
55 });
56}
57
58#[cfg(feature = "server")]
59pub fn parse_command(line: Result<Option<String>, std::io::Error>) -> Command {
61 use log;
62 match line {
63 Ok(Some(cmd)) => {
64 let command = match cmd.trim() {
65 "/create_user" => Command::CreateUser,
66 "/user_accounts" => Command::UserAccounts,
67 "/print_user_tsx" => Command::PrintUserTransactions,
68 "/print_tsx" => Command::PrintTransactions,
69 "/deposit" => Command::Deposit,
70 "/withdraw" => Command::Withdraw,
71 "/transfer" => Command::Transfer,
72 "/pay" => Command::Pay,
73 "/refund" => Command::Refund,
74 "/help" => Command::Help,
75 "/info" => Command::Info,
76 "/start_snapshot" => Command::Snapshot,
77 other => Command::Unknown(other.to_string()),
78 };
79 command
80 }
81 Ok(None) => {
82 println!("Aucun input");
83 Command::Unknown("Aucun input".to_string())
84 }
85 Err(e) => {
86 log::error!("Erreur de lecture stdin : {}", e);
87 Command::Error("Erreur de lecture stdin".to_string())
88 }
89 }
90}
91
92#[cfg(feature = "server")]
93#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
95pub enum Command {
96 CreateUser,
98 UserAccounts,
100 PrintUserTransactions,
102 PrintTransactions,
104 Deposit,
106 Withdraw,
108 Transfer,
110 Pay,
112 Refund,
114 Help,
116 Info,
118 Unknown(String),
120 Error(String),
122 Snapshot,
124}
125
126#[cfg(feature = "server")]
127#[derive(Debug, Clone, PartialEq)]
129pub enum CriticalCommands {
130 CreateUser { name: String },
132 Deposit { name: String, amount: f64 },
134 Withdraw { name: String, amount: f64 },
136 Transfer {
138 from: String,
139 to: String,
140 amount: f64,
141 },
142 Pay { name: String, amount: f64 },
144 Refund {
146 name: String,
147 lamport: i64,
148 node: String,
149 },
150 FileSnapshot,
152 SyncSnapshot,
154}
155
156#[cfg(feature = "server")]
157pub async fn enqueue_critical(cmd: CriticalCommands) -> Result<(), Box<dyn std::error::Error>> {
159 use crate::state::LOCAL_APP_STATE;
160 let mut st = LOCAL_APP_STATE.lock().await;
161
162 st.pending_commands.push_back(cmd);
163
164 if !st.in_sc && !st.waiting_sc {
166 st.acquire_mutex().await?;
167 }
168 Ok(())
169}
170
171#[cfg(feature = "server")]
172pub async fn execute_critical(cmd: CriticalCommands) -> Result<(), Box<dyn std::error::Error>> {
176 use crate::message::{Message, MessageInfo, NetworkMessageCode};
177 use crate::network::diffuse_message;
178 use crate::state::LOCAL_APP_STATE;
179
180 let (clock, site_addr, site_id) = {
181 let mut state = LOCAL_APP_STATE.lock().await;
182 let local_addr = state.get_site_addr();
183 let node = state.get_site_id();
184 let _ = state.update_clock(None);
185 let clock = state.get_clock();
186 (clock, local_addr, node)
187 };
188
189 let msg;
190
191 match cmd {
192 CriticalCommands::CreateUser { name } => {
193 use crate::message::CreateUser;
194 super::db::create_user(&name)?;
195 msg = Message {
196 command: Some(Command::CreateUser),
197 info: MessageInfo::CreateUser(CreateUser::new(name)),
198 code: NetworkMessageCode::Transaction,
199 clock: clock,
200 sender_addr: site_addr,
201 sender_id: site_id.to_string(),
202 message_initiator_id: site_id.to_string(),
203 message_initiator_addr: site_addr,
204 };
205 }
206 CriticalCommands::Deposit { name, amount } => {
207 use crate::message::Deposit;
208
209 super::db::deposit(
210 &name,
211 amount,
212 clock.get_lamport(),
213 site_id.as_str(),
214 clock.get_vector_clock_map(),
215 )?;
216
217 msg = Message {
218 command: Some(Command::Deposit),
219 info: MessageInfo::Deposit(Deposit::new(name, amount)),
220 code: NetworkMessageCode::Transaction,
221 clock: clock,
222 sender_addr: site_addr,
223 sender_id: site_id.to_string(),
224 message_initiator_id: site_id.to_string(),
225 message_initiator_addr: site_addr,
226 };
227 }
228 CriticalCommands::Withdraw { name, amount } => {
229 use crate::message::Withdraw;
230 super::db::withdraw(
231 &name,
232 amount,
233 clock.get_lamport(),
234 site_id.as_str(),
235 clock.get_vector_clock_map(),
236 )?;
237
238 msg = Message {
239 command: Some(Command::Withdraw),
240 info: MessageInfo::Withdraw(Withdraw::new(name, amount)),
241 code: NetworkMessageCode::Transaction,
242 clock: clock,
243 sender_addr: site_addr,
244 sender_id: site_id.to_string(),
245 message_initiator_id: site_id.to_string(),
246 message_initiator_addr: site_addr,
247 };
248 }
249 CriticalCommands::Transfer { from, to, amount } => {
250 use crate::message::Transfer;
251 super::db::create_transaction(
252 &from,
253 &to,
254 amount,
255 clock.get_lamport(),
256 site_id.as_str(),
257 "",
258 clock.get_vector_clock_map(),
259 )?;
260 msg = Message {
261 command: Some(Command::Transfer),
262 info: MessageInfo::Transfer(Transfer::new(from.clone(), to.clone(), amount)),
263 code: NetworkMessageCode::Transaction,
264 clock: clock,
265 sender_addr: site_addr,
266 sender_id: site_id.to_string(),
267 message_initiator_id: site_id.to_string(),
268 message_initiator_addr: site_addr,
269 };
270 }
271 CriticalCommands::Pay { name, amount } => {
272 use crate::message::Pay;
273 super::db::create_transaction(
274 &name,
275 "NULL",
276 amount,
277 clock.get_lamport(),
278 site_id.as_str(),
279 "",
280 clock.get_vector_clock_map(),
281 )?;
282 msg = Message {
283 command: Some(Command::Pay),
284 info: MessageInfo::Pay(Pay::new(name, amount)),
285 code: NetworkMessageCode::Transaction,
286 clock: clock,
287 sender_addr: site_addr,
288 sender_id: site_id.to_string(),
289 message_initiator_id: site_id.to_string(),
290 message_initiator_addr: site_addr,
291 };
292 }
293 CriticalCommands::Refund {
294 name,
295 lamport,
296 node,
297 } => {
298 use crate::message::Refund;
299 super::db::refund_transaction(
300 lamport,
301 &node.as_str(),
302 clock.get_lamport(),
303 site_id.as_str(),
304 clock.get_vector_clock_map(),
305 )?;
306 msg = Message {
307 command: Some(Command::Refund),
308 info: MessageInfo::Refund(Refund::new(name, lamport, node)),
309 code: NetworkMessageCode::Transaction,
310 clock: clock,
311 sender_addr: site_addr,
312 sender_id: site_id.to_string(),
313 message_initiator_id: site_id.to_string(),
314 message_initiator_addr: site_addr,
315 };
316 }
317 CriticalCommands::FileSnapshot => {
318 use crate::snapshot;
319 snapshot::start_snapshot(snapshot::SnapshotMode::FileMode).await?;
320
321 msg = Message {
322 command: None,
323 code: NetworkMessageCode::SnapshotRequest,
324 info: MessageInfo::None,
325 sender_addr: site_addr,
326 sender_id: site_id.to_string(),
327 message_initiator_id: site_id.to_string(),
328 message_initiator_addr: site_addr,
329 clock: clock.clone(),
330 };
331 }
332 CriticalCommands::SyncSnapshot => {
333 use crate::snapshot;
334 snapshot::start_snapshot(snapshot::SnapshotMode::SyncMode).await?;
335
336 msg = Message {
337 command: None,
338 code: NetworkMessageCode::SnapshotRequest,
339 info: MessageInfo::None,
340 sender_addr: site_addr,
341 sender_id: site_id.to_string(),
342 message_initiator_id: site_id.to_string(),
343 message_initiator_addr: site_addr,
344 clock: clock.clone(),
345 };
346 }
347 }
348
349 let should_diffuse = {
350 let mut state = LOCAL_APP_STATE.lock().await;
352 let nb_neigh = state.get_nb_connected_neighbours();
353 state.set_parent_addr(site_id.to_string(), site_addr);
354 state.set_nb_nei_for_wave(site_id.to_string(), nb_neigh);
355 nb_neigh > 0
356 };
357
358 if should_diffuse {
359 diffuse_message(&msg).await?;
360 };
361 Ok(())
362}
363
364#[cfg(feature = "server")]
365pub async fn process_cli_command(cmd: Command) -> Result<(), Box<dyn std::error::Error>> {
370 use crate::state::LOCAL_APP_STATE;
371
372 match cmd {
373 Command::CreateUser => {
374 let name = prompt("Username");
375 enqueue_critical(CriticalCommands::CreateUser { name }).await?;
376 }
377
378 Command::UserAccounts => {
379 super::db::print_users()?;
380 }
381
382 Command::PrintUserTransactions => {
383 let name = prompt("Username");
384 super::db::print_transaction_for_user(&name)?;
385 }
386
387 Command::PrintTransactions => {
388 super::db::print_transactions()?;
389 }
390
391 Command::Deposit => {
392 let name = prompt("Username");
393 let amount = prompt_parse::<f64>("Deposit amount");
394 enqueue_critical(CriticalCommands::Deposit {
395 name: name,
396 amount: amount,
397 })
398 .await?;
399 }
400
401 Command::Withdraw => {
402 let name = prompt("Username");
403 let amount = prompt_parse::<f64>("Withdraw amount");
404 if amount < 0.0 {}
405 enqueue_critical(CriticalCommands::Withdraw {
406 name: name,
407 amount: amount,
408 })
409 .await?;
410 }
411
412 Command::Transfer => {
413 let name = prompt("Username");
414
415 let amount = prompt_parse::<f64>("Transfer amount");
416 let _ = super::db::print_users();
417 let beneficiary = prompt("Beneficiary");
418
419 enqueue_critical(CriticalCommands::Transfer {
420 from: name.clone(),
421 to: beneficiary.clone(),
422 amount,
423 })
424 .await?;
425 }
426
427 Command::Pay => {
428 let name = prompt("Username");
429 let amount = prompt_parse::<f64>("Payment amount");
430
431 if amount <= 0.0 {
432 println!("❌ Amount must be positive");
433 return Ok(());
434 }
435 enqueue_critical(CriticalCommands::Pay {
436 name: name.clone(),
437 amount,
438 })
439 .await?;
440 }
441
442 Command::Refund => {
443 let name = prompt("Username");
444 super::db::print_transaction_for_user(&name).unwrap();
445
446 let transac_time = prompt_parse::<i64>("Lamport time");
447 let transac_node = prompt("Node");
448
449 enqueue_critical(CriticalCommands::Refund {
450 name: name.clone(),
451 lamport: transac_time,
452 node: transac_node.clone(),
453 })
454 .await?;
455 }
456
457 Command::Help => {
458 println!("📜 Command list:");
459 println!("----------------------------------------");
460 println!("/create_user - Create a personal account");
461 println!("/user_accounts - List all users");
462 println!("/print_user_tsx - Show a user's transactions");
463 println!("/print_tsx - Show all system transactions");
464 println!("/deposit - Deposit money to an account");
465 println!("/withdraw - Withdraw money from an account");
466 println!("/transfer - Transfer money to another user");
467 println!("/pay - Make a payment (to NULL)");
468 println!("/refund - Refund a transaction");
469 println!("/info - Show system information");
470 println!("/start_snapshot - Start a snapshot");
471 println!("/help - Show this help message");
472 println!("----------------------------------------");
473 }
474
475 Command::Snapshot => {
476 println!("📸 Starting snapshot...");
477 enqueue_critical(CriticalCommands::FileSnapshot).await?;
478 }
479
480 Command::Info => {
481 let (
482 site_addr,
483 site_id,
484 peer_addrs,
485 clock,
486 nb_connected_neighbours,
487 connected_neighbours_addrs,
488 parent_addr_for_transaction_wave,
489 attended_neighbours_nb_for_transaction_wave,
490 ) = {
491 let state = LOCAL_APP_STATE.lock().await;
492 (
493 state.get_site_addr(),
494 state.get_site_id().to_string(),
495 state.get_cli_peers_addrs(),
496 state.get_clock(),
497 state.get_nb_connected_neighbours(),
498 state.get_connected_nei_addr(),
499 state.get_parent_for_wave_map(),
500 state.get_nb_nei_for_wave(),
501 )
502 };
503
504 let db_path = {
505 let conn = crate::db::DB_CONN.lock().unwrap();
506 let path = conn.path().unwrap();
507 path.to_string().split("/").last().unwrap().to_string()
509 };
510
511 println!("📊 System Information:");
512 println!("----------------------------------------");
513 println!("Database : {}", db_path);
514 println!("Local Address: {}", site_addr);
515 println!("Site ID: {}", site_id);
516 println!("Number of CLI peers: {}", peer_addrs.len());
517 println!("CLI peers: {:?}", peer_addrs);
518 println!("Number of connected neighbors: {}", nb_connected_neighbours);
519 println!(
520 "Number of connected neighbors: {:?}",
521 connected_neighbours_addrs
522 );
523 println!("Vector Clock: {:?}", clock.get_vector_clock_map());
524 println!("Lamport Clock: {}", clock.get_lamport());
525 println!("--------- Wave diffusion info ------------");
526 println!(
527 "Parent addresses for wave (if any): {:?}",
528 parent_addr_for_transaction_wave
529 );
530 println!(
531 "Attended neighbours for wave (if any): {:?}",
532 attended_neighbours_nb_for_transaction_wave
533 );
534 println!("----------------------------------------");
535 }
536
537 Command::Unknown(msg) => {
538 println!("❌ Unknown command: {}", msg);
539 }
540
541 Command::Error(msg) => {
542 println!("❌ Error: {}", msg);
543 }
544 }
545
546 Ok(())
547}
548
549#[cfg(feature = "server")]
550pub async fn process_network_command(
554 msg: crate::message::MessageInfo,
555 received_clock: crate::clock::Clock,
556 sender_id: &str,
557) -> Result<(), Box<dyn std::error::Error>> {
558 use crate::message::MessageInfo;
559 use log;
560
561 let message_lamport_time = received_clock.get_lamport();
562 let message_vc_clock = received_clock.get_vector_clock_map();
563
564 if crate::db::transaction_exists(*message_lamport_time, sender_id)? {
565 log::info!("Transaction allready exists, skipping");
566 return Ok(());
567 }
568
569 match msg {
570 crate::message::MessageInfo::CreateUser(create_user) => {
571 if crate::db::user_exists(&create_user.name)? {
572 log::info!("User already exists, skipping");
573 return Ok(());
574 }
575 super::db::create_user(&create_user.name)?;
576 }
577 crate::message::MessageInfo::Deposit(deposit) => {
578 super::db::deposit(
579 &deposit.name,
580 deposit.amount,
581 &message_lamport_time,
582 sender_id,
583 &message_vc_clock,
584 )?;
585 }
586
587 MessageInfo::Withdraw(withdraw) => {
588 super::db::withdraw(
589 &withdraw.name,
590 withdraw.amount,
591 &message_lamport_time,
592 sender_id,
593 &message_vc_clock,
594 )?;
595 }
596
597 MessageInfo::Transfer(transfer) => {
598 super::db::create_transaction(
599 &transfer.name,
600 &transfer.beneficiary,
601 transfer.amount,
602 &message_lamport_time,
603 sender_id,
604 "",
605 &message_vc_clock,
606 )?;
607 }
608
609 MessageInfo::Pay(pay) => {
610 super::db::create_transaction(
611 &pay.name,
612 "NULL",
613 pay.amount,
614 &message_lamport_time,
615 sender_id,
616 "",
617 &message_vc_clock,
618 )?;
619 }
620
621 MessageInfo::Refund(refund) => {
622 super::db::refund_transaction(
623 refund.transac_time,
624 &refund.transac_node,
625 &message_lamport_time,
626 sender_id,
627 &message_vc_clock,
628 )?;
629 }
630 crate::message::MessageInfo::SnapshotResponse(_) => {
631 log::error!("Should not process snapshot response");
632 }
633 crate::message::MessageInfo::AckMutex(_) => {
634 }
636 crate::message::MessageInfo::AcquireMutex(_) => {
637 }
639 crate::message::MessageInfo::ReleaseMutex(_) => {
640 }
642 crate::message::MessageInfo::None => {
643 log::error!("Should not process None message");
644 }
645 crate::message::MessageInfo::Acknowledge(_) => {
646 log::error!("Should not process Acknowledge message");
647 }
648 }
649
650 Ok(())
651}
652
653#[cfg(feature = "server")]
654fn prompt(label: &str) -> String {
656 use std::io::{self, Write};
657 print!("{}: ", label);
658 io::stdout().flush().unwrap();
659 let mut input = String::new();
660 io::stdin().read_line(&mut input).unwrap();
661 input.trim().to_string()
662}
663
664#[cfg(feature = "server")]
665fn prompt_parse<T: std::str::FromStr>(label: &str) -> T
667where
668 T::Err: std::fmt::Debug,
669{
670 use std::io::{self, Write};
671 loop {
672 print!("{}: ", label);
673 io::stdout().flush().unwrap();
674 let mut input = String::new();
675 io::stdin().read_line(&mut input).unwrap();
676 match input.trim().parse() {
677 Ok(value) => return value,
678 Err(e) => println!("Invalid input: {:?}", e),
679 }
680 }
681}
682
683#[cfg(feature = "server")]
684#[tokio::test]
685async fn test_mutex_critical_section_high_load() {
686 use crate::state::{AppState, MutexStamp, MutexTag};
687 use std::net::SocketAddr;
688
689 let local_addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
690 let mut state = AppState::new(
691 "A".to_string(),
692 vec![
693 "127.0.0.1:9001".parse().unwrap(),
694 "127.0.0.1:9002".parse().unwrap(),
695 ],
696 local_addr,
697 );
698
699 state.set_nb_connected_neighbours(2);
701
702 state.global_mutex_fifo.insert(
704 "B".to_string(),
705 MutexStamp {
706 tag: MutexTag::Request,
707 date: 1,
708 },
709 );
710
711 state.global_mutex_fifo.insert(
712 "C".to_string(),
713 MutexStamp {
714 tag: MutexTag::Request,
715 date: 2,
716 },
717 );
718
719 for _ in 0..3 {
721 state.update_clock(None).await;
722 }
723 let _ = state.acquire_mutex().await;
724
725 assert_eq!(state.in_sc, false);
727
728 state.global_mutex_fifo.insert(
730 "B".to_string(),
731 MutexStamp {
732 tag: MutexTag::Ack,
733 date: 1,
734 },
735 );
736 state.global_mutex_fifo.insert(
737 "C".to_string(),
738 MutexStamp {
739 tag: MutexTag::Ack,
740 date: 2,
741 },
742 );
743
744 state.try_enter_sc();
746
747 assert_eq!(state.in_sc, true);
749
750 let _ = state.release_mutex().await;
752
753 assert_eq!(state.in_sc, false);
755 assert_eq!(state.waiting_sc, false);
756
757 assert!(!state.global_mutex_fifo.contains_key("A"));
759
760 for i in 0..100 {
762 let site = format!("S{}", i);
763 state.global_mutex_fifo.insert(
764 site.clone(),
765 MutexStamp {
766 tag: MutexTag::Request,
767 date: i,
768 },
769 );
770 }
771
772 for _ in 0..50 {
774 state.update_clock(None).await;
775 }
776 let _ = state.acquire_mutex().await;
777 state.try_enter_sc();
778 assert_eq!(state.in_sc, false); for i in 0..100 {
782 let site = format!("S{}", i);
783 state.global_mutex_fifo.insert(
784 site.clone(),
785 MutexStamp {
786 tag: MutexTag::Ack,
787 date: i,
788 },
789 );
790 }
791
792 state.try_enter_sc();
794 assert_eq!(state.in_sc, true); }