From f977b61c5091b159f0f6d94512d53e4b9c8a7a06 Mon Sep 17 00:00:00 2001 From: Thomas Heartman Date: Wed, 15 Jan 2025 09:22:16 +0100 Subject: [PATCH] chore(1-3244): only expose streaming endpoint if in streaming mode (#663) Make it so that the /streaming endpoint only accepts connections if the server is in streaming mode. This effectively renders the /streaming endpoint useless if the server is not connected to an upstream stream. This first solution uses the 403 http status code and tells the user that the endpoint is only enabled in streaming mode. --- server/src/client_api.rs | 19 +++++++++++++++---- server/src/error.rs | 3 +++ server/tests/streaming_test.rs | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/server/src/client_api.rs b/server/src/client_api.rs index 3d8ef9d0..a19c9367 100644 --- a/server/src/client_api.rs +++ b/server/src/client_api.rs @@ -1,3 +1,4 @@ +use crate::cli::{EdgeArgs, EdgeMode}; use crate::error::EdgeError; use crate::feature_cache::FeatureCache; use crate::filters::{ @@ -45,12 +46,22 @@ pub async fn stream_features( edge_token: EdgeToken, broadcaster: Data, token_cache: Data>, + edge_mode: Data, filter_query: Query, ) -> EdgeResult { - let (validated_token, _filter_set, query) = - get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; + match edge_mode.get_ref() { + EdgeMode::Edge(EdgeArgs { + streaming: true, .. + }) => { + let (validated_token, _filter_set, query) = + get_feature_filter(&edge_token, &token_cache, filter_query.clone())?; - broadcaster.connect(validated_token, query).await + broadcaster.connect(validated_token, query).await + } + _ => Err(EdgeError::Forbidden( + "This endpoint is only enabled in streaming mode".into(), + )), + } } #[utoipa::path( @@ -299,7 +310,7 @@ mod tests { use crate::auth::token_validator::TokenValidator; use crate::cli::{OfflineArgs, TokenHeader}; - use crate::http::unleash_client::{UnleashClient, ClientMetaInformation}; + use crate::http::unleash_client::{ClientMetaInformation, UnleashClient}; use crate::middleware; use crate::tests::{features_from_disk, upstream_server}; use actix_http::{Request, StatusCode}; diff --git a/server/src/error.rs b/server/src/error.rs index 9fde19e0..7faa6ecb 100644 --- a/server/src/error.rs +++ b/server/src/error.rs @@ -103,6 +103,7 @@ pub enum EdgeError { EdgeTokenError, EdgeTokenParseError, FeatureNotFound(String), + Forbidden(String), FrontendExpectedToBeHydrated(String), FrontendNotYetHydrated(FrontendHydrationMissing), HealthCheckError(String), @@ -212,6 +213,7 @@ impl Display for EdgeError { } EdgeError::InvalidTokenWithStrictBehavior => write!(f, "Edge is running with strict behavior and the token is not subsumed by any registered tokens"), EdgeError::SseError(message) => write!(f, "{}", message), + EdgeError::Forbidden(reason) => write!(f, "{}", reason), } } } @@ -253,6 +255,7 @@ impl ResponseError for EdgeError { EdgeError::NotReady => StatusCode::SERVICE_UNAVAILABLE, EdgeError::InvalidTokenWithStrictBehavior => StatusCode::FORBIDDEN, EdgeError::SseError(_) => StatusCode::INTERNAL_SERVER_ERROR, + EdgeError::Forbidden(_) => StatusCode::FORBIDDEN, } } diff --git a/server/tests/streaming_test.rs b/server/tests/streaming_test.rs index fd92feaa..8ca9e674 100644 --- a/server/tests/streaming_test.rs +++ b/server/tests/streaming_test.rs @@ -11,6 +11,7 @@ mod streaming_test { sync::Arc, }; use unleash_edge::{ + cli::{EdgeArgs, EdgeMode, TokenHeader}, feature_cache::FeatureCache, http::broadcaster::Broadcaster, tokens::cache_key, @@ -188,6 +189,36 @@ mod streaming_test { }); test_server(move || { + // the streaming endpoint doesn't work unless app data contains an EdgeMode::Edge with streaming: true + let edge_mode = EdgeMode::Edge(EdgeArgs { + streaming: true, + upstream_url: "".into(), + backup_folder: None, + metrics_interval_seconds: 60, + features_refresh_interval_seconds: 60, + token_revalidation_interval_seconds: 60, + tokens: vec!["".into()], + custom_client_headers: vec![], + skip_ssl_verification: false, + client_identity: None, + upstream_certificate_file: None, + upstream_request_timeout: 5, + upstream_socket_timeout: 5, + redis: None, + s3: None, + token_header: TokenHeader { + token_header: "".into(), + }, + strict: true, + dynamic: false, + delta: false, + prometheus_remote_write_url: None, + prometheus_push_interval: 60, + prometheus_username: None, + prometheus_password: None, + prometheus_user_id: None, + }); + let config = serde_qs::actix::QsQueryConfig::default() .qs_config(serde_qs::Config::new(5, false)); let metrics_cache = MetricsCache::default(); @@ -205,6 +236,7 @@ mod streaming_test { .app_data(web::Data::from(upstream_token_cache.clone())) .app_data(web::Data::new(metrics_cache)) .app_data(web::Data::new(connect_via)) + .app_data(web::Data::new(edge_mode)) .service( web::scope("/api") .configure(unleash_edge::client_api::configure_client_api)