ys memos

Blog

Rustで簡単なKVSを実装してみた ~4. KvsServer実装~


rust

2023/11/04


  1. 概要
  2. 型定義など
  3. handler定義
  4. Query定義
  5. KvsServer実装
  6. main&使ってみる

ここでは、new-typeを用いた型のラッピングを含んでいる。


src/kvs/*に配置するコードを記載する。

具体的に、ここまでで実装してきたパーツを統合するKVSサーバの実装をする。



KvsServerでは、ビルダーパターンに近いパラメータ設定を提供し、 start(&mut self)によってサーバを起動(portをbind)する。

現在のサーバ実装は、起動している時のみ値を保持してサーバ終了時にデータは揮発する仕組みで良いので、new()によってArc<RwLock<Store>>を初期化することとしている。 仮に永続化が必要となれば、その時に適切な初期化方法が必要となる。これをする場合は完全なbuilderパターンにすることが望ましいかもしれない。

listenerが新しいコネクションを受け取るたびに新たなtokio::spawn()によって非同期タスクとして処理し、複数のコネクションを受け入れる。

「リクエスト受信→Storeに対する処理→レスポンス送信」の処理の実態は process(KvsStream, Arc<RwLock<Store>>)というへルパ関数に実装した。 process()がロジックの統合の役割を担っている。

src/kvs/mod.rs
use std::sync::Arc;

use tokio::{net::TcpListener, sync::RwLock};

use crate::{error::KvsResult, handlers, query::Query, types::Store};

use self::stream::KvsStream;

mod response;
mod stream;

async fn process(mut kvs_stream: KvsStream, store: Arc<RwLock<Store>>) -> KvsResult<()> {
    loop {
        match kvs_stream.read().await {
            Err(e) => {
                println!("Error occured: {}", e);
                break;
            }
            Ok(input) => {
                if !input.is_empty() {
                    println!("{}", input);
                }
                let res = match Query::try_from(input.as_str()) {
                    Ok(query) => match query {
                        Query::Get(args) => handlers::get(store.clone(), args).await,
                        Query::Set(args) => handlers::set(store.clone(), args).await,
                    },
                    Err(msg) => Err(msg),
                };
                kvs_stream.write_result(res).await?;
            }
        }
    }
    Ok(())
}

pub struct KvsServer<'a> {
    store: Arc<RwLock<Store>>,
    addr: &'a str,
}

impl<'a> KvsServer<'a> {
    pub fn new() -> Self {
        Self {
            store: Arc::new(RwLock::new(Store::new())),
            addr: "0.0.0.0:8080",
        }
    }

    pub fn addr(mut self, addr: &'a str) -> Self {
        self.addr = addr;
        self
    }

    pub async fn start(&mut self) -> KvsResult<()> {
        let listener = TcpListener::bind(&self.addr).await?;
        loop {
            let (stream, addr) = listener.accept().await?;
            let kvs_stream = KvsStream(stream);
            tokio::spawn({
                let store = self.store.clone();
                async move { process(kvs_stream, store).await }
            });
            println!("socket connected from {}", addr);
        }
    }
}

KvsServerからのレスポンスを楽に生成するためのnew-type。

レスポンスはここでは文字列のみを扱うし、from<KvsResult<String>>を実装できればよいので、一番シンプルなnew-typeによる実装を選択した。

内部での発生したエラーを、レスポンスメッセージに変換する役割を担う。

src/kvs/response.rs
use crate::error::KvsResult;

/// Helper struct for KvsResponse
pub(super) struct KvsResponse(String);

impl From<KvsResult<String>> for KvsResponse {
    /// Kvs Response for TcpStream
    /// If command succeed, client get `Ok <res|msg>`
    /// If command failed, client get `Er <msg>`
    fn from(res: KvsResult<String>) -> Self {
        match res {
            Ok(msg) => Self(format!("Ok {msg}")),
            Err(msg) => Self(format!("Er {msg}")),
        }
    }
}

impl KvsResponse {
    pub(super) fn into_bytes(self) -> Vec<u8> {
        self.0.into_bytes()
    }
}

#[cfg(test)]
mod tests {
    use crate::{
        error::{KvsError, KvsResult},
        kvs::response::KvsResponse,
    };

    #[test]
    fn ok_message() {
        let res: KvsResult<String> = Ok("msg".to_owned());
        let buf = KvsResponse::from(res).into_bytes();
        assert_eq!(buf, "Ok msg".as_bytes());
    }

    #[test]
    fn er_message() {
        let res: KvsResult<String> = Err(KvsError::InvalidQueryFormat);
        let buf = KvsResponse::from(res).into_bytes();
        assert_eq!(
            buf,
            "Er must to have space: <method> <key> <...args>".as_bytes()
        );
    }
}

tokio::net::TcpStreamを使うが、KvsServerでよりシンプルに使うためのラッパとしてKvsStream(TcpStream)を定義した。

ここでnew-typeを選択したのは、KvsStreamは単なる小さなTcpStreamのラッパとして使いのが背景で、構造体では気軽さが損なわれてしまうための判断に基づいている。

src/kvs/stream.rs
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::TcpStream,
};

use crate::error::{KvsError, KvsResult};

use super::response::KvsResponse;

/// TcpStream Wrapper for Kvs
pub(super) struct KvsStream(pub(super) TcpStream);

impl KvsStream {
    pub async fn read(&mut self) -> KvsResult<String> {
        let mut buf = Vec::with_capacity(4096);
        match self.0.read_buf(&mut buf).await {
            Err(e) => Err(KvsError::StreamError(e)),
            Ok(0) => Err(KvsError::StreamDisconnected),
            Ok(_) => Ok(String::from_utf8(buf)?.trim().into()),
        }
    }

    /// Write result message to TcpStream
    pub async fn write_result(&mut self, res: KvsResult<String>) -> KvsResult<()> {
        let _ = self.0.write(&KvsResponse::from(res).into_bytes()).await?;
        Ok(())
    }
}

0.概要#おわりにに集約します。


0.概要#参考に集約します。

関連タグを探す