mirror of
https://github.com/dani-garcia/vaultwarden.git
synced 2025-11-03 03:38:21 +02:00
Abstract persistent files through Apache OpenDAL (#5626)
* Abstract file access through Apache OpenDAL * Add AWS S3 support via OpenDAL for data files * PR improvements * Additional PR improvements * Config setting comments for local/remote data locations
This commit is contained in:
@@ -11,10 +11,11 @@ use rocket::{
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::auth::ClientVersion;
|
||||
use crate::util::NumberOrString;
|
||||
use crate::util::{save_temp_file, NumberOrString};
|
||||
use crate::{
|
||||
api::{self, core::log_event, EmptyResult, JsonResult, Notify, PasswordOrOtpData, UpdateType},
|
||||
auth::Headers,
|
||||
config::PathType,
|
||||
crypto,
|
||||
db::{models::*, DbConn, DbPool},
|
||||
CONFIG,
|
||||
@@ -105,12 +106,7 @@ struct SyncData {
|
||||
}
|
||||
|
||||
#[get("/sync?<data..>")]
|
||||
async fn sync(
|
||||
data: SyncData,
|
||||
headers: Headers,
|
||||
client_version: Option<ClientVersion>,
|
||||
mut conn: DbConn,
|
||||
) -> Json<Value> {
|
||||
async fn sync(data: SyncData, headers: Headers, client_version: Option<ClientVersion>, mut conn: DbConn) -> JsonResult {
|
||||
let user_json = headers.user.to_json(&mut conn).await;
|
||||
|
||||
// Get all ciphers which are visible by the user
|
||||
@@ -134,7 +130,7 @@ async fn sync(
|
||||
for c in ciphers {
|
||||
ciphers_json.push(
|
||||
c.to_json(&headers.host, &headers.user.uuid, Some(&cipher_sync_data), CipherSyncType::User, &mut conn)
|
||||
.await,
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -159,7 +155,7 @@ async fn sync(
|
||||
api::core::_get_eq_domains(headers, true).into_inner()
|
||||
};
|
||||
|
||||
Json(json!({
|
||||
Ok(Json(json!({
|
||||
"profile": user_json,
|
||||
"folders": folders_json,
|
||||
"collections": collections_json,
|
||||
@@ -168,11 +164,11 @@ async fn sync(
|
||||
"domains": domains_json,
|
||||
"sends": sends_json,
|
||||
"object": "sync"
|
||||
}))
|
||||
})))
|
||||
}
|
||||
|
||||
#[get("/ciphers")]
|
||||
async fn get_ciphers(headers: Headers, mut conn: DbConn) -> Json<Value> {
|
||||
async fn get_ciphers(headers: Headers, mut conn: DbConn) -> JsonResult {
|
||||
let ciphers = Cipher::find_by_user_visible(&headers.user.uuid, &mut conn).await;
|
||||
let cipher_sync_data = CipherSyncData::new(&headers.user.uuid, CipherSyncType::User, &mut conn).await;
|
||||
|
||||
@@ -180,15 +176,15 @@ async fn get_ciphers(headers: Headers, mut conn: DbConn) -> Json<Value> {
|
||||
for c in ciphers {
|
||||
ciphers_json.push(
|
||||
c.to_json(&headers.host, &headers.user.uuid, Some(&cipher_sync_data), CipherSyncType::User, &mut conn)
|
||||
.await,
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
|
||||
Json(json!({
|
||||
Ok(Json(json!({
|
||||
"data": ciphers_json,
|
||||
"object": "list",
|
||||
"continuationToken": null
|
||||
}))
|
||||
})))
|
||||
}
|
||||
|
||||
#[get("/ciphers/<cipher_id>")]
|
||||
@@ -201,7 +197,7 @@ async fn get_cipher(cipher_id: CipherId, headers: Headers, mut conn: DbConn) ->
|
||||
err!("Cipher is not owned by user")
|
||||
}
|
||||
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await))
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await?))
|
||||
}
|
||||
|
||||
#[get("/ciphers/<cipher_id>/admin")]
|
||||
@@ -339,7 +335,7 @@ async fn post_ciphers(data: Json<CipherData>, headers: Headers, mut conn: DbConn
|
||||
let mut cipher = Cipher::new(data.r#type, data.name.clone());
|
||||
update_cipher_from_data(&mut cipher, data, &headers, None, &mut conn, &nt, UpdateType::SyncCipherCreate).await?;
|
||||
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await))
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await?))
|
||||
}
|
||||
|
||||
/// Enforces the personal ownership policy on user-owned ciphers, if applicable.
|
||||
@@ -676,7 +672,7 @@ async fn put_cipher(
|
||||
|
||||
update_cipher_from_data(&mut cipher, data, &headers, None, &mut conn, &nt, UpdateType::SyncCipherUpdate).await?;
|
||||
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await))
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await?))
|
||||
}
|
||||
|
||||
#[post("/ciphers/<cipher_id>/partial", data = "<data>")]
|
||||
@@ -714,7 +710,7 @@ async fn put_cipher_partial(
|
||||
// Update favorite
|
||||
cipher.set_favorite(Some(data.favorite), &headers.user.uuid, &mut conn).await?;
|
||||
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await))
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await?))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -825,7 +821,7 @@ async fn post_collections_update(
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await))
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await?))
|
||||
}
|
||||
|
||||
#[put("/ciphers/<cipher_id>/collections-admin", data = "<data>")]
|
||||
@@ -1030,7 +1026,7 @@ async fn share_cipher_by_uuid(
|
||||
|
||||
update_cipher_from_data(&mut cipher, data.cipher, headers, Some(shared_to_collections), conn, nt, ut).await?;
|
||||
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, conn).await))
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, conn).await?))
|
||||
}
|
||||
|
||||
/// v2 API for downloading an attachment. This just redirects the client to
|
||||
@@ -1055,7 +1051,7 @@ async fn get_attachment(
|
||||
}
|
||||
|
||||
match Attachment::find_by_id(&attachment_id, &mut conn).await {
|
||||
Some(attachment) if cipher_id == attachment.cipher_uuid => Ok(Json(attachment.to_json(&headers.host))),
|
||||
Some(attachment) if cipher_id == attachment.cipher_uuid => Ok(Json(attachment.to_json(&headers.host).await?)),
|
||||
Some(_) => err!("Attachment doesn't belong to cipher"),
|
||||
None => err!("Attachment doesn't exist"),
|
||||
}
|
||||
@@ -1116,7 +1112,7 @@ async fn post_attachment_v2(
|
||||
"attachmentId": attachment_id,
|
||||
"url": url,
|
||||
"fileUploadType": FileUploadType::Direct as i32,
|
||||
response_key: cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await,
|
||||
response_key: cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await?,
|
||||
})))
|
||||
}
|
||||
|
||||
@@ -1142,7 +1138,7 @@ async fn save_attachment(
|
||||
mut conn: DbConn,
|
||||
nt: Notify<'_>,
|
||||
) -> Result<(Cipher, DbConn), crate::error::Error> {
|
||||
let mut data = data.into_inner();
|
||||
let data = data.into_inner();
|
||||
|
||||
let Some(size) = data.data.len().to_i64() else {
|
||||
err!("Attachment data size overflow");
|
||||
@@ -1269,13 +1265,7 @@ async fn save_attachment(
|
||||
attachment.save(&mut conn).await.expect("Error saving attachment");
|
||||
}
|
||||
|
||||
let folder_path = tokio::fs::canonicalize(&CONFIG.attachments_folder()).await?.join(cipher_id.as_ref());
|
||||
let file_path = folder_path.join(file_id.as_ref());
|
||||
tokio::fs::create_dir_all(&folder_path).await?;
|
||||
|
||||
if let Err(_err) = data.data.persist_to(&file_path).await {
|
||||
data.data.move_copy_to(file_path).await?
|
||||
}
|
||||
save_temp_file(PathType::Attachments, &format!("{cipher_id}/{file_id}"), data.data, true).await?;
|
||||
|
||||
nt.send_cipher_update(
|
||||
UpdateType::SyncCipherUpdate,
|
||||
@@ -1342,7 +1332,7 @@ async fn post_attachment(
|
||||
|
||||
let (cipher, mut conn) = save_attachment(attachment, cipher_id, data, &headers, conn, nt).await?;
|
||||
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await))
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, &mut conn).await?))
|
||||
}
|
||||
|
||||
#[post("/ciphers/<cipher_id>/attachment-admin", format = "multipart/form-data", data = "<data>")]
|
||||
@@ -1786,7 +1776,7 @@ async fn _restore_cipher_by_uuid(
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, conn).await))
|
||||
Ok(Json(cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, conn).await?))
|
||||
}
|
||||
|
||||
async fn _restore_multiple_ciphers(
|
||||
@@ -1859,7 +1849,7 @@ async fn _delete_cipher_attachment_by_id(
|
||||
)
|
||||
.await;
|
||||
}
|
||||
let cipher_json = cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, conn).await;
|
||||
let cipher_json = cipher.to_json(&headers.host, &headers.user.uuid, None, CipherSyncType::User, conn).await?;
|
||||
Ok(Json(json!({"cipher":cipher_json})))
|
||||
}
|
||||
|
||||
|
||||
@@ -582,7 +582,7 @@ async fn view_emergency_access(emer_id: EmergencyAccessId, headers: Headers, mut
|
||||
CipherSyncType::User,
|
||||
&mut conn,
|
||||
)
|
||||
.await,
|
||||
.await?,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -917,21 +917,26 @@ async fn get_org_details(data: OrgIdData, headers: OrgMemberHeaders, mut conn: D
|
||||
}
|
||||
|
||||
Ok(Json(json!({
|
||||
"data": _get_org_details(&data.organization_id, &headers.host, &headers.user.uuid, &mut conn).await,
|
||||
"data": _get_org_details(&data.organization_id, &headers.host, &headers.user.uuid, &mut conn).await?,
|
||||
"object": "list",
|
||||
"continuationToken": null,
|
||||
})))
|
||||
}
|
||||
|
||||
async fn _get_org_details(org_id: &OrganizationId, host: &str, user_id: &UserId, conn: &mut DbConn) -> Value {
|
||||
async fn _get_org_details(
|
||||
org_id: &OrganizationId,
|
||||
host: &str,
|
||||
user_id: &UserId,
|
||||
conn: &mut DbConn,
|
||||
) -> Result<Value, crate::Error> {
|
||||
let ciphers = Cipher::find_by_org(org_id, conn).await;
|
||||
let cipher_sync_data = CipherSyncData::new(user_id, CipherSyncType::Organization, conn).await;
|
||||
|
||||
let mut ciphers_json = Vec::with_capacity(ciphers.len());
|
||||
for c in ciphers {
|
||||
ciphers_json.push(c.to_json(host, user_id, Some(&cipher_sync_data), CipherSyncType::Organization, conn).await);
|
||||
ciphers_json.push(c.to_json(host, user_id, Some(&cipher_sync_data), CipherSyncType::Organization, conn).await?);
|
||||
}
|
||||
json!(ciphers_json)
|
||||
Ok(json!(ciphers_json))
|
||||
}
|
||||
|
||||
#[derive(FromForm)]
|
||||
@@ -3372,7 +3377,7 @@ async fn get_org_export(org_id: OrganizationId, headers: AdminHeaders, mut conn:
|
||||
|
||||
Ok(Json(json!({
|
||||
"collections": convert_json_key_lcase_first(_get_org_collections(&org_id, &mut conn).await),
|
||||
"ciphers": convert_json_key_lcase_first(_get_org_details(&org_id, &headers.host, &headers.user.uuid, &mut conn).await),
|
||||
"ciphers": convert_json_key_lcase_first(_get_org_details(&org_id, &headers.host, &headers.user.uuid, &mut conn).await?),
|
||||
})))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, TimeDelta, Utc};
|
||||
use num_traits::ToPrimitive;
|
||||
@@ -12,8 +13,9 @@ use serde_json::Value;
|
||||
use crate::{
|
||||
api::{ApiResult, EmptyResult, JsonResult, Notify, UpdateType},
|
||||
auth::{ClientIp, Headers, Host},
|
||||
config::PathType,
|
||||
db::{models::*, DbConn, DbPool},
|
||||
util::NumberOrString,
|
||||
util::{save_temp_file, NumberOrString},
|
||||
CONFIG,
|
||||
};
|
||||
|
||||
@@ -228,7 +230,7 @@ async fn post_send_file(data: Form<UploadData<'_>>, headers: Headers, mut conn:
|
||||
|
||||
let UploadData {
|
||||
model,
|
||||
mut data,
|
||||
data,
|
||||
} = data.into_inner();
|
||||
let model = model.into_inner();
|
||||
|
||||
@@ -268,13 +270,8 @@ async fn post_send_file(data: Form<UploadData<'_>>, headers: Headers, mut conn:
|
||||
}
|
||||
|
||||
let file_id = crate::crypto::generate_send_file_id();
|
||||
let folder_path = tokio::fs::canonicalize(&CONFIG.sends_folder()).await?.join(&send.uuid);
|
||||
let file_path = folder_path.join(&file_id);
|
||||
tokio::fs::create_dir_all(&folder_path).await?;
|
||||
|
||||
if let Err(_err) = data.persist_to(&file_path).await {
|
||||
data.move_copy_to(file_path).await?
|
||||
}
|
||||
save_temp_file(PathType::Sends, &format!("{}/{file_id}", send.uuid), data, true).await?;
|
||||
|
||||
let mut data_value: Value = serde_json::from_str(&send.data)?;
|
||||
if let Some(o) = data_value.as_object_mut() {
|
||||
@@ -381,7 +378,7 @@ async fn post_send_file_v2_data(
|
||||
) -> EmptyResult {
|
||||
enforce_disable_send_policy(&headers, &mut conn).await?;
|
||||
|
||||
let mut data = data.into_inner();
|
||||
let data = data.into_inner();
|
||||
|
||||
let Some(send) = Send::find_by_uuid_and_user(&send_id, &headers.user.uuid, &mut conn).await else {
|
||||
err!("Send not found. Unable to save the file.", "Invalid send uuid or does not belong to user.")
|
||||
@@ -424,19 +421,9 @@ async fn post_send_file_v2_data(
|
||||
err!("Send file size does not match.", format!("Expected a file size of {} got {size}", send_data.size));
|
||||
}
|
||||
|
||||
let folder_path = tokio::fs::canonicalize(&CONFIG.sends_folder()).await?.join(send_id);
|
||||
let file_path = folder_path.join(file_id);
|
||||
let file_path = format!("{send_id}/{file_id}");
|
||||
|
||||
// Check if the file already exists, if that is the case do not overwrite it
|
||||
if tokio::fs::metadata(&file_path).await.is_ok() {
|
||||
err!("Send file has already been uploaded.", format!("File {file_path:?} already exists"))
|
||||
}
|
||||
|
||||
tokio::fs::create_dir_all(&folder_path).await?;
|
||||
|
||||
if let Err(_err) = data.data.persist_to(&file_path).await {
|
||||
data.data.move_copy_to(file_path).await?
|
||||
}
|
||||
save_temp_file(PathType::Sends, &file_path, data.data, false).await?;
|
||||
|
||||
nt.send_send_update(
|
||||
UpdateType::SyncSendCreate,
|
||||
@@ -569,15 +556,26 @@ async fn post_access_file(
|
||||
)
|
||||
.await;
|
||||
|
||||
let token_claims = crate::auth::generate_send_claims(&send_id, &file_id);
|
||||
let token = crate::auth::encode_jwt(&token_claims);
|
||||
Ok(Json(json!({
|
||||
"object": "send-fileDownload",
|
||||
"id": file_id,
|
||||
"url": format!("{}/api/sends/{send_id}/{file_id}?t={token}", &host.host)
|
||||
"url": download_url(&host, &send_id, &file_id).await?,
|
||||
})))
|
||||
}
|
||||
|
||||
async fn download_url(host: &Host, send_id: &SendId, file_id: &SendFileId) -> Result<String, crate::Error> {
|
||||
let operator = CONFIG.opendal_operator_for_path_type(PathType::Sends)?;
|
||||
|
||||
if operator.info().scheme() == opendal::Scheme::Fs {
|
||||
let token_claims = crate::auth::generate_send_claims(send_id, file_id);
|
||||
let token = crate::auth::encode_jwt(&token_claims);
|
||||
|
||||
Ok(format!("{}/api/sends/{send_id}/{file_id}?t={token}", &host.host))
|
||||
} else {
|
||||
Ok(operator.presign_read(&format!("{send_id}/{file_id}"), Duration::from_secs(5 * 60)).await?.uri().to_string())
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/sends/<send_id>/<file_id>?<t>")]
|
||||
async fn download_send(send_id: SendId, file_id: SendFileId, t: &str) -> Option<NamedFile> {
|
||||
if let Ok(claims) = crate::auth::decode_send(t) {
|
||||
|
||||
@@ -261,7 +261,7 @@ pub(crate) async fn get_duo_keys_email(email: &str, conn: &mut DbConn) -> ApiRes
|
||||
}
|
||||
.map_res("Can't fetch Duo Keys")?;
|
||||
|
||||
Ok((data.ik, data.sk, CONFIG.get_duo_akey(), data.host))
|
||||
Ok((data.ik, data.sk, CONFIG.get_duo_akey().await, data.host))
|
||||
}
|
||||
|
||||
pub async fn generate_duo_signature(email: &str, conn: &mut DbConn) -> ApiResult<(String, String)> {
|
||||
|
||||
Reference in New Issue
Block a user