Add endpoints for search and pagination

also add non-blocking logger
This commit is contained in:
Rohan Sircar 2021-05-18 19:43:18 +05:30
parent 0c64225bcf
commit c993ef87ee
12 changed files with 305 additions and 58 deletions

33
Cargo.lock generated
View File

@ -74,6 +74,7 @@ dependencies = [
"timeago",
"tracing",
"tracing-actix-web",
"tracing-appender",
"tracing-bunyan-formatter",
"tracing-futures",
"tracing-log",
@ -837,6 +838,27 @@ dependencies = [
"cfg-if 1.0.0",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4feb231f0d4d6af81aed15928e58ecf5816aa62a2393e2c82f46973e92a9a278"
dependencies = [
"autocfg 1.0.1",
"cfg-if 1.0.0",
"lazy_static",
]
[[package]]
name = "crypto-mac"
version = "0.10.0"
@ -2861,6 +2883,17 @@ dependencies = [
"uuid",
]
[[package]]
name = "tracing-appender"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a"
dependencies = [
"chrono",
"crossbeam-channel",
"tracing-subscriber",
]
[[package]]
name = "tracing-attributes"
version = "0.1.15"

View File

@ -42,6 +42,7 @@ diesel-tracing = { version = "0.1.4", features = ["sqlite"] }
validators = "0.22.5"
diesel-derive-newtype = "0.1"
derive_more = "0.99.13"
tracing-appender = "0.1.2"
[dependencies.validators-derive]
version = "0.22.5"

View File

@ -1,12 +1,12 @@
use diesel::prelude::*;
use crate::models;
use crate::models::{self, Pagination, UserId};
use crate::{errors, models::Password};
use bcrypt::{hash, verify, DEFAULT_COST};
use validators::prelude::*;
pub fn find_user_by_uid(
uid: i32,
uid: &UserId,
conn: &impl diesel::Connection<Backend = diesel::sqlite::Sqlite>,
) -> Result<Option<models::User>, errors::DomainError> {
use crate::schema::users::dsl::*;
@ -42,6 +42,46 @@ pub fn get_all(
.load::<models::User>(conn)?)
}
// def findAll(userId: Long, limit: Int, offset: Int) = db.run {
// for {
// comments <- query.filter(_.creatorId === userId)
// .sortBy(_.createdAt)
// .drop(offset).take(limit)
// .result
// numberOfComments <- query.filter(_.creatorId === userId).length.result
// } yield PaginatedResult(
// totalCount = numberOfComments,
// entities = comments.toList,
// hasNextPage = numberOfComments - (offset + limit) > 0
// )
// }
pub fn get_users_paginated(
// user_id: UserId,
pagination: &Pagination,
conn: &impl diesel::Connection<Backend = diesel::sqlite::Sqlite>,
) -> Result<Vec<models::User>, errors::DomainError> {
// use crate::schema::users::dsl::*;
Ok(query::_paginate_result(&pagination).load::<models::User>(conn)?)
}
pub fn search_users(
query: &str,
pagination: &Pagination,
conn: &impl diesel::Connection<Backend = diesel::sqlite::Sqlite>,
) -> Result<Vec<models::User>, errors::DomainError> {
use crate::schema::users::dsl::*;
// Ok(users
// .filter(name.like(format!("%{}%", query)))
// .order_by(created_at)
// .offset(pagination.calc_offset().as_uint().into())
// .limit(pagination.limit.as_uint().into())
// .load::<models::User>(conn)?)
Ok(query::_paginate_result(&pagination)
.filter(name.like(format!("%{}%", query)))
.load::<models::User>(conn)?)
}
pub fn insert_new_user(
nu: models::NewUser,
conn: &impl diesel::Connection<Backend = diesel::sqlite::Sqlite>,
@ -81,7 +121,7 @@ pub fn verify_password(
}
mod query {
use diesel::prelude::*;
use super::*;
use diesel::sql_types::Integer;
use diesel::sql_types::Text;
use diesel::sql_types::Timestamp;
@ -91,11 +131,19 @@ mod query {
type Query<'a, B, T> = crate::schema::users::BoxedQuery<'a, B, T>;
pub fn _get_user_by_name(
) -> Query<'static, Sqlite, (Integer, Text, Text, Timestamp)> {
use crate::schema::users::dsl::*;
users.into_boxed()
}
pub fn _paginate_result(
pagination: &Pagination,
) -> Query<'static, Sqlite, (Integer, Text, Text, Timestamp)> {
use crate::schema::users::dsl::*;
users
.select(users::all_columns())
// .filter(name.eq(user_name))
.order_by(created_at)
.offset(pagination.calc_offset().as_uint().into())
.limit(pagination.limit.as_uint().into())
.into_boxed()
}
}

View File

@ -42,12 +42,12 @@ impl ResponseError for DomainError {
.json(ApiResponse::failure(err.to_string()))
}
DomainError::DbError { source: _ } => {
tracing::error!("{}", err);
let _ = tracing::error!("{}", err);
HttpResponse::InternalServerError()
.json(ApiResponse::failure("Error in database".to_owned()))
}
DomainError::DbPoolError { source: _ } => {
tracing::error!("{}", err);
let _ = tracing::error!("{}", err);
HttpResponse::InternalServerError().json(ApiResponse::failure(
"Error getting database pool".to_owned(),
))
@ -61,7 +61,7 @@ impl ResponseError for DomainError {
.json(ApiResponse::failure(err.to_string()))
}
DomainError::ThreadPoolError { message: _ } => {
tracing::error!("{}", err);
let _ = tracing::error!("{}", err);
HttpResponse::InternalServerError().json(ApiResponse::failure(
"Thread pool error occurred".to_owned(),
))
@ -69,7 +69,7 @@ impl ResponseError for DomainError {
DomainError::AuthError { message: _ } => HttpResponse::Forbidden()
.json(ApiResponse::failure(err.to_string())),
DomainError::FieldValidationError { message: _ } => {
tracing::error!("{}", err);
let _ = tracing::error!("{}", err);
HttpResponse::BadRequest()
.json(ApiResponse::failure(err.to_string()))
}

View File

@ -73,11 +73,19 @@ pub fn configure_app(app_data: AppData) -> Box<dyn Fn(&mut ServiceConfig)> {
"",
web::get().to(routes::users::get_all_users),
)
.route(
"/search",
web::get().to(routes::users::search_users),
)
.route("", web::post().to(routes::users::add_user))
.route(
"/{user_id}",
web::get().to(routes::users::get_user),
)
.route("", web::post().to(routes::users::add_user)),
),
)
.route(
"/pagination",
web::get().to(routes::users::get_users_paginated),
)
.route(
"/build-info",
@ -108,7 +116,11 @@ pub fn id_service(
pub async fn run(addr: String, app_data: AppData) -> io::Result<()> {
let bi = get_build_info();
tracing::info!("Starting {} {}", bi.crate_info.name, bi.crate_info.version);
let _ = tracing::info!(
"Starting {} {}",
bi.crate_info.name,
bi.crate_info.version
);
println!(
r#"
__ .__ .___

View File

@ -78,6 +78,9 @@ pub fn setup_logger(format: LoggerFormat) -> io::Result<()> {
)
})?;
let (non_blocking, _guard) =
tracing_appender::non_blocking(std::io::stdout());
let _ = LogTracer::init().map_err(|err| {
io::Error::new(
ErrorKind::Other,
@ -91,8 +94,8 @@ pub fn setup_logger(format: LoggerFormat) -> io::Result<()> {
LoggerFormat::Json => {
let formatting_layer = BunyanFormattingLayer::new(
format!("actix-demo-{}", bi.crate_info.version),
// Output the formatted spans to stdout.
std::io::stdout,
// Output the formatted spans to non-blocking writer
non_blocking,
);
let subscriber = Registry::default()
.with(env_filter)

View File

@ -1,3 +1 @@
pub mod csrf;
pub use self::csrf::*;

View File

@ -7,6 +7,9 @@ use std::convert::TryFrom;
use std::{convert::TryInto, str::FromStr};
use validators::prelude::*;
///newtype to constrain id to positive int values
///
///sqlite does not allow u32 for primary keys
#[derive(
Debug,
Clone,
@ -34,14 +37,13 @@ impl FromStr for UserId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Ok(num) = s.parse::<u32>() {
(num as u32)
.try_into()
num.try_into()
.map_err(|err| {
format!("error while converting user_id: {}", err)
format!("negative values are not allowed: {}", err)
})
.map(UserId)
} else {
Err("negative values are not allowed".to_owned())
Err("expected unsigned int, received string".to_owned())
}
}
}
@ -91,6 +93,94 @@ pub struct NewUser {
pub password: Password,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(try_from = "u16")]
pub struct PaginationOffset(u16);
impl PaginationOffset {
pub fn as_uint(&self) -> u16 {
self.0
}
}
impl TryFrom<u16> for PaginationOffset {
type Error = String;
fn try_from(value: u16) -> Result<Self, Self::Error> {
if value <= 2500 {
Ok(PaginationOffset(value))
} else {
Err("Failed to validate".to_owned())
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(try_from = "u16")]
pub struct PaginationLimit(u16);
impl PaginationLimit {
pub fn as_uint(&self) -> u16 {
self.0
}
}
impl TryFrom<u16> for PaginationLimit {
type Error = String;
fn try_from(value: u16) -> Result<Self, Self::Error> {
if value <= 50 {
Ok(PaginationLimit(value))
} else {
Err("Failed to validate".to_owned())
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(try_from = "u16")]
pub struct PaginationPage(u16);
impl PaginationPage {
pub fn as_uint(&self) -> u16 {
self.0
}
}
impl TryFrom<u16> for PaginationPage {
type Error = String;
fn try_from(value: u16) -> Result<Self, Self::Error> {
if value <= 50 {
Ok(PaginationPage(value))
} else {
Err("Failed to validate".to_owned())
}
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct Pagination {
pub page: PaginationPage,
pub limit: PaginationLimit,
}
impl Pagination {
pub fn calc_offset(&self) -> PaginationOffset {
let res = self.page.as_uint() * self.limit.as_uint();
PaginationOffset::try_from(res).unwrap()
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct SearchQuery(String);
impl SearchQuery {
pub fn as_str(&self) -> &str {
&self.0
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct UserSearchRequest {
pub q: SearchQuery,
// pub pagination: Pagination
}
#[cfg(test)]
mod test {
use super::*;
@ -127,4 +217,18 @@ mod test {
);
assert_eq!(mb_user.is_ok(), false);
}
#[test]
fn pagination_refinement_test() {
let mb_pag =
serde_json::from_str::<Pagination>(r#"{"limit":5,"page":5}"#);
// println!("{:?}", mb_pag);
assert_eq!(mb_pag.is_ok(), true);
let mb_pag =
serde_json::from_str::<Pagination>(r#"{"limit":51,"page":5}"#);
assert_eq!(mb_pag.is_ok(), false);
let mb_pag =
serde_json::from_str::<Pagination>(r#"{"limit":5,"page":51}"#);
assert_eq!(mb_pag.is_ok(), false);
}
}

View File

@ -46,7 +46,7 @@ pub async fn logout(
) -> Result<HttpResponse, Error> {
let maybe_identity = id.identity();
let response = if let Some(identity) = maybe_identity {
tracing::info!("Logging out {user}", user = identity);
let _ = tracing::info!("Logging out {user}", user = identity);
id.forget();
HttpResponse::Found().header("location", "/").finish()
} else {

View File

@ -2,7 +2,7 @@ use actix_web::{web, HttpResponse};
use crate::{
actions,
models::{self, ApiResponse},
models::{self, ApiResponse, Pagination, UserId, UserSearchRequest},
};
use crate::{errors::DomainError, AppData};
use actix_web::error::ResponseError;
@ -17,19 +17,20 @@ use actix_web::error::ResponseError;
)]
pub async fn get_user(
app_data: web::Data<AppData>,
user_id: web::Path<i32>,
user_id: web::Path<UserId>,
) -> Result<HttpResponse, DomainError> {
let u_id = user_id.into_inner();
tracing::info!("Getting user with id {}", u_id);
let u_id2 = u_id.clone();
let _ = tracing::info!("Getting user with id {}", u_id);
// use web::block to offload blocking Diesel code without blocking server thread
let res = web::block(move || {
let pool = &app_data.pool;
let conn = pool.get()?;
actions::find_user_by_uid(u_id, &conn)
actions::find_user_by_uid(&u_id2, &conn)
})
.await
.map_err(|err| DomainError::new_thread_pool_error(err.to_string()))?;
tracing::trace!("{:?}", res);
let _ = tracing::trace!("{:?}", res);
if let Some(user) = res {
Ok(HttpResponse::Ok().json(ApiResponse::successful(user)))
} else {
@ -73,7 +74,7 @@ pub async fn get_all_users(
.await
.map_err(|err| DomainError::new_thread_pool_error(err.to_string()))?;
tracing::trace!("{:?}", users);
let _ = tracing::trace!("{:?}", users);
if !users.is_empty() {
Ok(HttpResponse::Ok().json(ApiResponse::successful(users)))
@ -84,6 +85,47 @@ pub async fn get_all_users(
}
}
#[tracing::instrument(level = "debug", skip(app_data))]
pub async fn get_users_paginated(
app_data: web::Data<AppData>,
pagination: web::Query<Pagination>,
) -> Result<HttpResponse, DomainError> {
let _ = tracing::info!("Paginated users request");
let users = web::block(move || {
let pool = &app_data.pool;
let conn = pool.get()?;
let p: Pagination = pagination.into_inner();
actions::get_users_paginated(&p, &conn)
})
.await
.map_err(|err| DomainError::new_thread_pool_error(err.to_string()))?;
let _ = tracing::trace!("{:?}", users);
Ok(HttpResponse::Ok().json(ApiResponse::successful(users)))
}
#[tracing::instrument(level = "debug", skip(app_data))]
pub async fn search_users(
app_data: web::Data<AppData>,
query: web::Query<UserSearchRequest>,
pagination: web::Query<Pagination>,
) -> Result<HttpResponse, DomainError> {
let _ = tracing::info!("Search users request");
let users = web::block(move || {
let pool = &app_data.pool;
let conn = pool.get()?;
let p: Pagination = pagination.into_inner();
actions::search_users(query.q.as_str(), &p, &conn)
})
.await
.map_err(|err| DomainError::new_thread_pool_error(err.to_string()))?;
let _ = tracing::trace!("{:?}", users);
Ok(HttpResponse::Ok().json(ApiResponse::successful(users)))
}
/// Inserts new user with name defined in form.
#[tracing::instrument(level = "debug", skip(app_data))]
pub async fn add_user(
@ -98,7 +140,7 @@ pub async fn add_user(
})
.await
.map(|user| {
tracing::trace!("{:?}", user);
HttpResponse::Ok().json(ApiResponse::successful(user))
let _ = tracing::trace!("{:?}", user);
HttpResponse::Created().json(ApiResponse::successful(user))
})
}

View File

@ -15,7 +15,7 @@ mod tests {
let resp = common::test_app().await.unwrap().call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let body: build_info::BuildInfo = test::read_body_json(resp).await;
tracing::debug!("{:?}", body);
let _ = tracing::debug!("{:?}", body);
assert_eq!(body, *get_build_info());
}
}

View File

@ -8,34 +8,40 @@ mod tests {
use actix_web::http::StatusCode;
use actix_web::test;
#[actix_rt::test]
async fn get_users_api_should_return_error_message_if_no_users_exist() {
let req = test::TestRequest::get().uri("/api/users").to_request();
let resp = common::test_app().await.unwrap().call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let body: ApiResponse<String> = test::read_body_json(resp).await;
tracing::debug!("{:?}", body);
assert_eq!(
body,
ApiResponse::failure(
"Entity does not exist - No users available".to_owned()
)
);
}
mod get_users_api {
use super::*;
#[actix_rt::test]
async fn get_user_api_should_return_error_message_if_user_with_id_does_not_exist(
) {
let req = test::TestRequest::get().uri("/api/users/1").to_request();
let resp = common::test_app().await.unwrap().call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let body: ApiResponse<String> = test::read_body_json(resp).await;
tracing::debug!("{:?}", body);
assert_eq!(
body,
ApiResponse::failure(
"Entity does not exist - No user found with uid: 1".to_owned()
)
);
#[actix_rt::test]
async fn should_return_error_message_if_no_users_exist() {
let req = test::TestRequest::get().uri("/api/users").to_request();
let resp =
common::test_app().await.unwrap().call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let body: ApiResponse<String> = test::read_body_json(resp).await;
let _ = tracing::debug!("{:?}", body);
assert_eq!(
body,
ApiResponse::failure(
"Entity does not exist - No users available".to_owned()
)
);
}
#[actix_rt::test]
async fn should_return_error_message_if_user_with_id_does_not_exist() {
let req = test::TestRequest::get().uri("/api/users/1").to_request();
let resp =
common::test_app().await.unwrap().call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
let body: ApiResponse<String> = test::read_body_json(resp).await;
let _ = tracing::debug!("{:?}", body);
assert_eq!(
body,
ApiResponse::failure(
"Entity does not exist - No user found with uid: 1"
.to_owned()
)
);
}
}
}