peillute/
control.rs

1//! Command handling and CLI interface
2//!
3//! This module provides the command-line interface and command handling functionality
4//! for the Peillute application, including both local and network command processing.
5
6#![cfg(feature = "server")]
7/// Worker that handles critical commands
8pub fn control_worker() {
9    tokio::spawn(async {
10        use crate::state::LOCAL_APP_STATE;
11
12        loop {
13            // ① Récupérer Notify sans garder le verrou
14            let notify = {
15                let st = LOCAL_APP_STATE.lock().await;
16                st.notify_sc.clone()
17            };
18
19            // réveille dès qu'on a la section critique
20            notify.notified().await;
21
22            // Vider la file de tsx en attente
23            {
24                let (in_st, waiting, nb_pending) = {
25                    let st = LOCAL_APP_STATE.lock().await;
26                    (st.in_sc, st.waiting_sc, st.pending_commands.len())
27                };
28
29                if !waiting && nb_pending > 0 && !in_st {
30                    let mut st = LOCAL_APP_STATE.lock().await;
31                    let _ = st.acquire_mutex().await;
32                    continue;
33                }
34
35                if in_st && nb_pending > 0 {
36                    log::info!("Début de la section critique");
37                    loop {
38                        let cmd_opt = {
39                            let mut st = LOCAL_APP_STATE.lock().await;
40                            st.pending_commands.pop_front()
41                        };
42                        if let Some(cmd) = cmd_opt {
43                            log::info!("Execute critical command");
44                            if let Err(e) = crate::control::execute_critical(cmd).await {
45                                log::error!("Erreur exécution commande critique : {}", e);
46                            }
47                        } else {
48                            break;
49                        }
50                    }
51                    log::info!("Fin de la section critique");
52                }
53            }
54        }
55    });
56}
57
58#[cfg(feature = "server")]
59/// Parse a line of input from the CLI and converts it to a Command
60pub fn parse_command(line: Result<Option<String>, std::io::Error>) -> Command {
61    use log;
62    match line {
63        Ok(Some(cmd)) => {
64            let command = match cmd.trim() {
65                "/create_user" => Command::CreateUser,
66                "/user_accounts" => Command::UserAccounts,
67                "/print_user_tsx" => Command::PrintUserTransactions,
68                "/print_tsx" => Command::PrintTransactions,
69                "/deposit" => Command::Deposit,
70                "/withdraw" => Command::Withdraw,
71                "/transfer" => Command::Transfer,
72                "/pay" => Command::Pay,
73                "/refund" => Command::Refund,
74                "/help" => Command::Help,
75                "/info" => Command::Info,
76                "/start_snapshot" => Command::Snapshot,
77                other => Command::Unknown(other.to_string()),
78            };
79            command
80        }
81        Ok(None) => {
82            println!("Aucun input");
83            Command::Unknown("Aucun input".to_string())
84        }
85        Err(e) => {
86            log::error!("Erreur de lecture stdin : {}", e);
87            Command::Error("Erreur de lecture stdin".to_string())
88        }
89    }
90}
91
92#[cfg(feature = "server")]
93/// Available commands in the system
94#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)]
95pub enum Command {
96    /// Create a new user account
97    CreateUser,
98    /// List all user accounts
99    UserAccounts,
100    /// Display transactions for a specific user
101    PrintUserTransactions,
102    /// Display all system transactions
103    PrintTransactions,
104    /// Deposit money into an account
105    Deposit,
106    /// Withdraw money from an account
107    Withdraw,
108    /// Transfer money between accounts
109    Transfer,
110    /// Make a payment
111    Pay,
112    /// Process a refund
113    Refund,
114    /// Display help information
115    Help,
116    /// Display system information
117    Info,
118    /// Unknown command
119    Unknown(String),
120    /// Error command
121    Error(String),
122    /// Start a system snapshot
123    Snapshot,
124}
125
126#[cfg(feature = "server")]
127/// Critical commands that can be executed on our site
128#[derive(Debug, Clone, PartialEq)]
129pub enum CriticalCommands {
130    /// Create a new user account
131    CreateUser { name: String },
132    /// Deposit money into an account
133    Deposit { name: String, amount: f64 },
134    /// Withdraw money from an account
135    Withdraw { name: String, amount: f64 },
136    /// Transfer money between accounts
137    Transfer {
138        from: String,
139        to: String,
140        amount: f64,
141    },
142    /// Make a payment
143    Pay { name: String, amount: f64 },
144    /// Process a refund
145    Refund {
146        name: String,
147        lamport: i64,
148        node: String,
149    },
150    /// Request a snapshot to save as a JSON
151    FileSnapshot,
152    /// Request a snapshot to update our database
153    SyncSnapshot,
154}
155
156#[cfg(feature = "server")]
157/// Enqueue a critical command
158pub async fn enqueue_critical(cmd: CriticalCommands) -> Result<(), Box<dyn std::error::Error>> {
159    use crate::state::LOCAL_APP_STATE;
160    let mut st = LOCAL_APP_STATE.lock().await;
161
162    st.pending_commands.push_back(cmd);
163
164    // si on n’est ni en SC ni déjà en attente → on déclenche la vague
165    if !st.in_sc && !st.waiting_sc {
166        st.acquire_mutex().await?;
167    }
168    Ok(())
169}
170
171#[cfg(feature = "server")]
172/// Execute a critical command on our site
173///
174/// Called by the control worker only when the Mutex is acquired
175pub async fn execute_critical(cmd: CriticalCommands) -> Result<(), Box<dyn std::error::Error>> {
176    use crate::message::{Message, MessageInfo, NetworkMessageCode};
177    use crate::network::diffuse_message;
178    use crate::state::LOCAL_APP_STATE;
179
180    let (clock, site_addr, site_id) = {
181        let mut state = LOCAL_APP_STATE.lock().await;
182        let local_addr = state.get_site_addr();
183        let node = state.get_site_id();
184        let _ = state.update_clock(None);
185        let clock = state.get_clock();
186        (clock, local_addr, node)
187    };
188
189    let msg;
190
191    match cmd {
192        CriticalCommands::CreateUser { name } => {
193            use crate::message::CreateUser;
194            super::db::create_user(&name)?;
195            msg = Message {
196                command: Some(Command::CreateUser),
197                info: MessageInfo::CreateUser(CreateUser::new(name)),
198                code: NetworkMessageCode::Transaction,
199                clock: clock,
200                sender_addr: site_addr,
201                sender_id: site_id.to_string(),
202                message_initiator_id: site_id.to_string(),
203                message_initiator_addr: site_addr,
204            };
205        }
206        CriticalCommands::Deposit { name, amount } => {
207            use crate::message::Deposit;
208
209            super::db::deposit(
210                &name,
211                amount,
212                clock.get_lamport(),
213                site_id.as_str(),
214                clock.get_vector_clock_map(),
215            )?;
216
217            msg = Message {
218                command: Some(Command::Deposit),
219                info: MessageInfo::Deposit(Deposit::new(name, amount)),
220                code: NetworkMessageCode::Transaction,
221                clock: clock,
222                sender_addr: site_addr,
223                sender_id: site_id.to_string(),
224                message_initiator_id: site_id.to_string(),
225                message_initiator_addr: site_addr,
226            };
227        }
228        CriticalCommands::Withdraw { name, amount } => {
229            use crate::message::Withdraw;
230            super::db::withdraw(
231                &name,
232                amount,
233                clock.get_lamport(),
234                site_id.as_str(),
235                clock.get_vector_clock_map(),
236            )?;
237
238            msg = Message {
239                command: Some(Command::Withdraw),
240                info: MessageInfo::Withdraw(Withdraw::new(name, amount)),
241                code: NetworkMessageCode::Transaction,
242                clock: clock,
243                sender_addr: site_addr,
244                sender_id: site_id.to_string(),
245                message_initiator_id: site_id.to_string(),
246                message_initiator_addr: site_addr,
247            };
248        }
249        CriticalCommands::Transfer { from, to, amount } => {
250            use crate::message::Transfer;
251            super::db::create_transaction(
252                &from,
253                &to,
254                amount,
255                clock.get_lamport(),
256                site_id.as_str(),
257                "",
258                clock.get_vector_clock_map(),
259            )?;
260            msg = Message {
261                command: Some(Command::Transfer),
262                info: MessageInfo::Transfer(Transfer::new(from.clone(), to.clone(), amount)),
263                code: NetworkMessageCode::Transaction,
264                clock: clock,
265                sender_addr: site_addr,
266                sender_id: site_id.to_string(),
267                message_initiator_id: site_id.to_string(),
268                message_initiator_addr: site_addr,
269            };
270        }
271        CriticalCommands::Pay { name, amount } => {
272            use crate::message::Pay;
273            super::db::create_transaction(
274                &name,
275                "NULL",
276                amount,
277                clock.get_lamport(),
278                site_id.as_str(),
279                "",
280                clock.get_vector_clock_map(),
281            )?;
282            msg = Message {
283                command: Some(Command::Pay),
284                info: MessageInfo::Pay(Pay::new(name, amount)),
285                code: NetworkMessageCode::Transaction,
286                clock: clock,
287                sender_addr: site_addr,
288                sender_id: site_id.to_string(),
289                message_initiator_id: site_id.to_string(),
290                message_initiator_addr: site_addr,
291            };
292        }
293        CriticalCommands::Refund {
294            name,
295            lamport,
296            node,
297        } => {
298            use crate::message::Refund;
299            super::db::refund_transaction(
300                lamport,
301                &node.as_str(),
302                clock.get_lamport(),
303                site_id.as_str(),
304                clock.get_vector_clock_map(),
305            )?;
306            msg = Message {
307                command: Some(Command::Refund),
308                info: MessageInfo::Refund(Refund::new(name, lamport, node)),
309                code: NetworkMessageCode::Transaction,
310                clock: clock,
311                sender_addr: site_addr,
312                sender_id: site_id.to_string(),
313                message_initiator_id: site_id.to_string(),
314                message_initiator_addr: site_addr,
315            };
316        }
317        CriticalCommands::FileSnapshot => {
318            use crate::snapshot;
319            snapshot::start_snapshot(snapshot::SnapshotMode::FileMode).await?;
320
321            msg = Message {
322                command: None,
323                code: NetworkMessageCode::SnapshotRequest,
324                info: MessageInfo::None,
325                sender_addr: site_addr,
326                sender_id: site_id.to_string(),
327                message_initiator_id: site_id.to_string(),
328                message_initiator_addr: site_addr,
329                clock: clock.clone(),
330            };
331        }
332        CriticalCommands::SyncSnapshot => {
333            use crate::snapshot;
334            snapshot::start_snapshot(snapshot::SnapshotMode::SyncMode).await?;
335
336            msg = Message {
337                command: None,
338                code: NetworkMessageCode::SnapshotRequest,
339                info: MessageInfo::None,
340                sender_addr: site_addr,
341                sender_id: site_id.to_string(),
342                message_initiator_id: site_id.to_string(),
343                message_initiator_addr: site_addr,
344                clock: clock.clone(),
345            };
346        }
347    }
348
349    let should_diffuse = {
350        // initialisation des paramètres avant la diffusion d'un message
351        let mut state = LOCAL_APP_STATE.lock().await;
352        let nb_neigh = state.get_nb_connected_neighbours();
353        state.set_parent_addr(site_id.to_string(), site_addr);
354        state.set_nb_nei_for_wave(site_id.to_string(), nb_neigh);
355        nb_neigh > 0
356    };
357
358    if should_diffuse {
359        diffuse_message(&msg).await?;
360    };
361    Ok(())
362}
363
364#[cfg(feature = "server")]
365/// Execute a command from the CLI
366/// Update the clock of the site
367/// Interact with the database
368/// Implement our wave diffusion protocol
369pub async fn process_cli_command(cmd: Command) -> Result<(), Box<dyn std::error::Error>> {
370    use crate::state::LOCAL_APP_STATE;
371
372    match cmd {
373        Command::CreateUser => {
374            let name = prompt("Username");
375            enqueue_critical(CriticalCommands::CreateUser { name }).await?;
376        }
377
378        Command::UserAccounts => {
379            super::db::print_users()?;
380        }
381
382        Command::PrintUserTransactions => {
383            let name = prompt("Username");
384            super::db::print_transaction_for_user(&name)?;
385        }
386
387        Command::PrintTransactions => {
388            super::db::print_transactions()?;
389        }
390
391        Command::Deposit => {
392            let name = prompt("Username");
393            let amount = prompt_parse::<f64>("Deposit amount");
394            enqueue_critical(CriticalCommands::Deposit {
395                name: name,
396                amount: amount,
397            })
398            .await?;
399        }
400
401        Command::Withdraw => {
402            let name = prompt("Username");
403            let amount = prompt_parse::<f64>("Withdraw amount");
404            if amount < 0.0 {}
405            enqueue_critical(CriticalCommands::Withdraw {
406                name: name,
407                amount: amount,
408            })
409            .await?;
410        }
411
412        Command::Transfer => {
413            let name = prompt("Username");
414
415            let amount = prompt_parse::<f64>("Transfer amount");
416            let _ = super::db::print_users();
417            let beneficiary = prompt("Beneficiary");
418
419            enqueue_critical(CriticalCommands::Transfer {
420                from: name.clone(),
421                to: beneficiary.clone(),
422                amount,
423            })
424            .await?;
425        }
426
427        Command::Pay => {
428            let name = prompt("Username");
429            let amount = prompt_parse::<f64>("Payment amount");
430
431            if amount <= 0.0 {
432                println!("❌ Amount must be positive");
433                return Ok(());
434            }
435            enqueue_critical(CriticalCommands::Pay {
436                name: name.clone(),
437                amount,
438            })
439            .await?;
440        }
441
442        Command::Refund => {
443            let name = prompt("Username");
444            super::db::print_transaction_for_user(&name).unwrap();
445
446            let transac_time = prompt_parse::<i64>("Lamport time");
447            let transac_node = prompt("Node");
448
449            enqueue_critical(CriticalCommands::Refund {
450                name: name.clone(),
451                lamport: transac_time,
452                node: transac_node.clone(),
453            })
454            .await?;
455        }
456
457        Command::Help => {
458            println!("📜 Command list:");
459            println!("----------------------------------------");
460            println!("/create_user      - Create a personal account");
461            println!("/user_accounts    - List all users");
462            println!("/print_user_tsx   - Show a user's transactions");
463            println!("/print_tsx        - Show all system transactions");
464            println!("/deposit          - Deposit money to an account");
465            println!("/withdraw         - Withdraw money from an account");
466            println!("/transfer         - Transfer money to another user");
467            println!("/pay              - Make a payment (to NULL)");
468            println!("/refund           - Refund a transaction");
469            println!("/info             - Show system information");
470            println!("/start_snapshot   - Start a snapshot");
471            println!("/help             - Show this help message");
472            println!("----------------------------------------");
473        }
474
475        Command::Snapshot => {
476            println!("📸 Starting snapshot...");
477            enqueue_critical(CriticalCommands::FileSnapshot).await?;
478        }
479
480        Command::Info => {
481            let (
482                site_addr,
483                site_id,
484                peer_addrs,
485                clock,
486                nb_connected_neighbours,
487                connected_neighbours_addrs,
488                parent_addr_for_transaction_wave,
489                attended_neighbours_nb_for_transaction_wave,
490            ) = {
491                let state = LOCAL_APP_STATE.lock().await;
492                (
493                    state.get_site_addr(),
494                    state.get_site_id().to_string(),
495                    state.get_cli_peers_addrs(),
496                    state.get_clock(),
497                    state.get_nb_connected_neighbours(),
498                    state.get_connected_nei_addr(),
499                    state.get_parent_for_wave_map(),
500                    state.get_nb_nei_for_wave(),
501                )
502            };
503
504            let db_path = {
505                let conn = crate::db::DB_CONN.lock().unwrap();
506                let path = conn.path().unwrap();
507                // keep only the name of the file (after the last "/")
508                path.to_string().split("/").last().unwrap().to_string()
509            };
510
511            println!("📊 System Information:");
512            println!("----------------------------------------");
513            println!("Database : {}", db_path);
514            println!("Local Address: {}", site_addr);
515            println!("Site ID: {}", site_id);
516            println!("Number of CLI peers: {}", peer_addrs.len());
517            println!("CLI peers: {:?}", peer_addrs);
518            println!("Number of connected neighbors: {}", nb_connected_neighbours);
519            println!(
520                "Number of connected neighbors: {:?}",
521                connected_neighbours_addrs
522            );
523            println!("Vector Clock: {:?}", clock.get_vector_clock_map());
524            println!("Lamport Clock: {}", clock.get_lamport());
525            println!("--------- Wave diffusion info ------------");
526            println!(
527                "Parent addresses for wave (if any): {:?}",
528                parent_addr_for_transaction_wave
529            );
530            println!(
531                "Attended neighbours for wave (if any): {:?}",
532                attended_neighbours_nb_for_transaction_wave
533            );
534            println!("----------------------------------------");
535        }
536
537        Command::Unknown(msg) => {
538            println!("❌ Unknown command: {}", msg);
539        }
540
541        Command::Error(msg) => {
542            println!("❌ Error: {}", msg);
543        }
544    }
545
546    Ok(())
547}
548
549#[cfg(feature = "server")]
550/// Process commands received from the network
551/// Update the clock of the site
552/// Interact with the database
553pub async fn process_network_command(
554    msg: crate::message::MessageInfo,
555    received_clock: crate::clock::Clock,
556    sender_id: &str,
557) -> Result<(), Box<dyn std::error::Error>> {
558    use crate::message::MessageInfo;
559    use log;
560
561    let message_lamport_time = received_clock.get_lamport();
562    let message_vc_clock = received_clock.get_vector_clock_map();
563
564    if crate::db::transaction_exists(*message_lamport_time, sender_id)? {
565        log::info!("Transaction allready exists, skipping");
566        return Ok(());
567    }
568
569    match msg {
570        crate::message::MessageInfo::CreateUser(create_user) => {
571            if crate::db::user_exists(&create_user.name)? {
572                log::info!("User already exists, skipping");
573                return Ok(());
574            }
575            super::db::create_user(&create_user.name)?;
576        }
577        crate::message::MessageInfo::Deposit(deposit) => {
578            super::db::deposit(
579                &deposit.name,
580                deposit.amount,
581                &message_lamport_time,
582                sender_id,
583                &message_vc_clock,
584            )?;
585        }
586
587        MessageInfo::Withdraw(withdraw) => {
588            super::db::withdraw(
589                &withdraw.name,
590                withdraw.amount,
591                &message_lamport_time,
592                sender_id,
593                &message_vc_clock,
594            )?;
595        }
596
597        MessageInfo::Transfer(transfer) => {
598            super::db::create_transaction(
599                &transfer.name,
600                &transfer.beneficiary,
601                transfer.amount,
602                &message_lamport_time,
603                sender_id,
604                "",
605                &message_vc_clock,
606            )?;
607        }
608
609        MessageInfo::Pay(pay) => {
610            super::db::create_transaction(
611                &pay.name,
612                "NULL",
613                pay.amount,
614                &message_lamport_time,
615                sender_id,
616                "",
617                &message_vc_clock,
618            )?;
619        }
620
621        MessageInfo::Refund(refund) => {
622            super::db::refund_transaction(
623                refund.transac_time,
624                &refund.transac_node,
625                &message_lamport_time,
626                sender_id,
627                &message_vc_clock,
628            )?;
629        }
630        crate::message::MessageInfo::SnapshotResponse(_) => {
631            log::error!("Should not process snapshot response");
632        }
633        crate::message::MessageInfo::AckMutex(_) => {
634            // Handle mutex acknowledgment
635        }
636        crate::message::MessageInfo::AcquireMutex(_) => {
637            // Handle mutex request
638        }
639        crate::message::MessageInfo::ReleaseMutex(_) => {
640            // Handle mutex release
641        }
642        crate::message::MessageInfo::None => {
643            log::error!("Should not process None message");
644        }
645        crate::message::MessageInfo::Acknowledge(_) => {
646            log::error!("Should not process Acknowledge message");
647        }
648    }
649
650    Ok(())
651}
652
653#[cfg(feature = "server")]
654/// Prompts the user for input with a label
655fn prompt(label: &str) -> String {
656    use std::io::{self, Write};
657    print!("{}: ", label);
658    io::stdout().flush().unwrap();
659    let mut input = String::new();
660    io::stdin().read_line(&mut input).unwrap();
661    input.trim().to_string()
662}
663
664#[cfg(feature = "server")]
665/// Prompts the user for input and parses it to a specific type
666fn prompt_parse<T: std::str::FromStr>(label: &str) -> T
667where
668    T::Err: std::fmt::Debug,
669{
670    use std::io::{self, Write};
671    loop {
672        print!("{}: ", label);
673        io::stdout().flush().unwrap();
674        let mut input = String::new();
675        io::stdin().read_line(&mut input).unwrap();
676        match input.trim().parse() {
677            Ok(value) => return value,
678            Err(e) => println!("Invalid input: {:?}", e),
679        }
680    }
681}
682
683#[cfg(feature = "server")]
684#[tokio::test]
685async fn test_mutex_critical_section_high_load() {
686    use crate::state::{AppState, MutexStamp, MutexTag};
687    use std::net::SocketAddr;
688
689    let local_addr: SocketAddr = "127.0.0.1:9000".parse().unwrap();
690    let mut state = AppState::new(
691        "A".to_string(),
692        vec![
693            "127.0.0.1:9001".parse().unwrap(),
694            "127.0.0.1:9002".parse().unwrap(),
695        ],
696        local_addr,
697    );
698
699    // Set manually the number of connected neighbours
700    state.set_nb_connected_neighbours(2);
701
702    // Simulate remote requests in FIFO before our own
703    state.global_mutex_fifo.insert(
704        "B".to_string(),
705        MutexStamp {
706            tag: MutexTag::Request,
707            date: 1,
708        },
709    );
710
711    state.global_mutex_fifo.insert(
712        "C".to_string(),
713        MutexStamp {
714            tag: MutexTag::Request,
715            date: 2,
716        },
717    );
718
719    // Now request our own access with a higher Lamport (should wait)
720    for _ in 0..3 {
721        state.update_clock(None).await;
722    }
723    let _ = state.acquire_mutex().await;
724
725    // Our site should not be in SC yet
726    assert_eq!(state.in_sc, false);
727
728    // Insert ACKs from all peers with lower Lamport (simulate reception)
729    state.global_mutex_fifo.insert(
730        "B".to_string(),
731        MutexStamp {
732            tag: MutexTag::Ack,
733            date: 1,
734        },
735    );
736    state.global_mutex_fifo.insert(
737        "C".to_string(),
738        MutexStamp {
739            tag: MutexTag::Ack,
740            date: 2,
741        },
742    );
743
744    // Manually call try_enter_sc() to simulate triggering by incoming ack
745    state.try_enter_sc();
746
747    // Now we should be in the section critique
748    assert_eq!(state.in_sc, true);
749
750    // Simulate some work and then release
751    let _ = state.release_mutex().await;
752
753    // After release, should no longer be in critical section
754    assert_eq!(state.in_sc, false);
755    assert_eq!(state.waiting_sc, false);
756
757    // All entries should be cleaned up
758    assert!(!state.global_mutex_fifo.contains_key("A"));
759
760    // Simulate again to check order with large number of requests
761    for i in 0..100 {
762        let site = format!("S{}", i);
763        state.global_mutex_fifo.insert(
764            site.clone(),
765            MutexStamp {
766                tag: MutexTag::Request,
767                date: i,
768            },
769        );
770    }
771
772    // Now site A requests with date = 50 (should wait since lower stamps exist)
773    for _ in 0..50 {
774        state.update_clock(None).await;
775    }
776    let _ = state.acquire_mutex().await;
777    state.try_enter_sc();
778    assert_eq!(state.in_sc, false); // can't enter yet
779
780    // Now convert all others to ACK
781    for i in 0..100 {
782        let site = format!("S{}", i);
783        state.global_mutex_fifo.insert(
784            site.clone(),
785            MutexStamp {
786                tag: MutexTag::Ack,
787                date: i,
788            },
789        );
790    }
791
792    // Try entering again
793    state.try_enter_sc();
794    assert_eq!(state.in_sc, true); // should succeed now
795}