2023/11/04
目次
- 概要
- 型定義など
- handler定義
- Query定義
- KvsServer実装
- main&使ってみる
はじめに
ここでは、new-typeを用いた型のラッピングを含んでいる。
記載対象
src/kvs/*
に配置するコードを記載する。
具体的に、ここまでで実装してきたパーツを統合するKVSサーバの実装をする。
コード
src/kvs/mod.rs
KvsServer
では、ビルダーパターンに近いパラメータ設定を提供し、 start(&mut self)
によってサーバを起動(portをbind)する。
現在のサーバ実装は、起動している時のみ値を保持してサーバ終了時にデータは揮発する仕組みで良いので、new()
によってArc<RwLock<Store>>
を初期化することとしている。
仮に永続化が必要となれば、その時に適切な初期化方法が必要となる。これをする場合は完全なbuilderパターンにすることが望ましいかもしれない。
listenerが新しいコネクションを受け取るたびに新たなtokio::spawn()
によって非同期タスクとして処理し、複数のコネクションを受け入れる。
「リクエスト受信→Storeに対する処理→レスポンス送信」の処理の実態は process(KvsStream, Arc<RwLock<Store>>)
というへルパ関数に実装した。
process()
がロジックの統合の役割を担っている。
use std::sync::Arc;
use tokio::{net::TcpListener, sync::RwLock};
use crate::{error::KvsResult, handlers, query::Query, types::Store};
use self::stream::KvsStream;
mod response;
mod stream;
async fn process(mut kvs_stream: KvsStream, store: Arc<RwLock<Store>>) -> KvsResult<()> {
loop {
match kvs_stream.read().await {
Err(e) => {
println!("Error occured: {}", e);
break;
}
Ok(input) => {
if !input.is_empty() {
println!("{}", input);
}
let res = match Query::try_from(input.as_str()) {
Ok(query) => match query {
Query::Get(args) => handlers::get(store.clone(), args).await,
Query::Set(args) => handlers::set(store.clone(), args).await,
},
Err(msg) => Err(msg),
};
kvs_stream.write_result(res).await?;
}
}
}
Ok(())
}
pub struct KvsServer<'a> {
store: Arc<RwLock<Store>>,
addr: &'a str,
}
impl<'a> KvsServer<'a> {
pub fn new() -> Self {
Self {
store: Arc::new(RwLock::new(Store::new())),
addr: "0.0.0.0:8080",
}
}
pub fn addr(mut self, addr: &'a str) -> Self {
self.addr = addr;
self
}
pub async fn start(&mut self) -> KvsResult<()> {
let listener = TcpListener::bind(&self.addr).await?;
loop {
let (stream, addr) = listener.accept().await?;
let kvs_stream = KvsStream(stream);
tokio::spawn({
let store = self.store.clone();
async move { process(kvs_stream, store).await }
});
println!("socket connected from {}", addr);
}
}
}
src/kvs/response.rs
KvsServerからのレスポンスを楽に生成するためのnew-type。
レスポンスはここでは文字列のみを扱うし、from<KvsResult<String>>
を実装できればよいので、一番シンプルなnew-typeによる実装を選択した。
内部での発生したエラーを、レスポンスメッセージに変換する役割を担う。
use crate::error::KvsResult;
/// Helper struct for KvsResponse
pub(super) struct KvsResponse(String);
impl From<KvsResult<String>> for KvsResponse {
/// Kvs Response for TcpStream
/// If command succeed, client get `Ok <res|msg>`
/// If command failed, client get `Er <msg>`
fn from(res: KvsResult<String>) -> Self {
match res {
Ok(msg) => Self(format!("Ok {msg}")),
Err(msg) => Self(format!("Er {msg}")),
}
}
}
impl KvsResponse {
pub(super) fn into_bytes(self) -> Vec<u8> {
self.0.into_bytes()
}
}
#[cfg(test)]
mod tests {
use crate::{
error::{KvsError, KvsResult},
kvs::response::KvsResponse,
};
#[test]
fn ok_message() {
let res: KvsResult<String> = Ok("msg".to_owned());
let buf = KvsResponse::from(res).into_bytes();
assert_eq!(buf, "Ok msg".as_bytes());
}
#[test]
fn er_message() {
let res: KvsResult<String> = Err(KvsError::InvalidQueryFormat);
let buf = KvsResponse::from(res).into_bytes();
assert_eq!(
buf,
"Er must to have space: <method> <key> <...args>".as_bytes()
);
}
}
src/kvs/stream.rs
tokio::net::TcpStream
を使うが、KvsServerでよりシンプルに使うためのラッパとしてKvsStream(TcpStream)
を定義した。
ここでnew-typeを選択したのは、KvsStreamは単なる小さなTcpStreamのラッパとして使いのが背景で、構造体では気軽さが損なわれてしまうための判断に基づいている。
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
use crate::error::{KvsError, KvsResult};
use super::response::KvsResponse;
/// TcpStream Wrapper for Kvs
pub(super) struct KvsStream(pub(super) TcpStream);
impl KvsStream {
pub async fn read(&mut self) -> KvsResult<String> {
let mut buf = Vec::with_capacity(4096);
match self.0.read_buf(&mut buf).await {
Err(e) => Err(KvsError::StreamError(e)),
Ok(0) => Err(KvsError::StreamDisconnected),
Ok(_) => Ok(String::from_utf8(buf)?.trim().into()),
}
}
/// Write result message to TcpStream
pub async fn write_result(&mut self, res: KvsResult<String>) -> KvsResult<()> {
let _ = self.0.write(&KvsResponse::from(res).into_bytes()).await?;
Ok(())
}
}
おわりに
0.概要#おわりにに集約します。
参考
0.概要#参考に集約します。