summaryrefslogtreecommitdiff
path: root/vendor/tonic-reflection/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/tonic-reflection/src/server')
-rw-r--r--vendor/tonic-reflection/src/server/mod.rs326
-rw-r--r--vendor/tonic-reflection/src/server/v1.rs138
-rw-r--r--vendor/tonic-reflection/src/server/v1alpha.rs138
3 files changed, 602 insertions, 0 deletions
diff --git a/vendor/tonic-reflection/src/server/mod.rs b/vendor/tonic-reflection/src/server/mod.rs
new file mode 100644
index 00000000..2b1a806a
--- /dev/null
+++ b/vendor/tonic-reflection/src/server/mod.rs
@@ -0,0 +1,326 @@
+use std::collections::HashMap;
+use std::fmt::{Display, Formatter};
+use std::sync::Arc;
+
+use prost::{DecodeError, Message};
+use prost_types::{
+ DescriptorProto, EnumDescriptorProto, FieldDescriptorProto, FileDescriptorProto,
+ FileDescriptorSet,
+};
+use tonic::Status;
+
+/// v1 interface for the gRPC Reflection Service server.
+pub mod v1;
+/// v1alpha interface for the gRPC Reflection Service server.
+pub mod v1alpha;
+
+/// A builder used to construct a gRPC Reflection Service.
+#[derive(Debug)]
+pub struct Builder<'b> {
+ file_descriptor_sets: Vec<FileDescriptorSet>,
+ encoded_file_descriptor_sets: Vec<&'b [u8]>,
+ include_reflection_service: bool,
+
+ service_names: Vec<String>,
+ use_all_service_names: bool,
+}
+
+impl<'b> Builder<'b> {
+ /// Create a new builder that can configure a gRPC Reflection Service.
+ pub fn configure() -> Self {
+ Builder {
+ file_descriptor_sets: Vec::new(),
+ encoded_file_descriptor_sets: Vec::new(),
+ include_reflection_service: true,
+
+ service_names: Vec::new(),
+ use_all_service_names: true,
+ }
+ }
+
+ /// Registers an instance of `prost_types::FileDescriptorSet` with the gRPC Reflection
+ /// Service builder.
+ pub fn register_file_descriptor_set(mut self, file_descriptor_set: FileDescriptorSet) -> Self {
+ self.file_descriptor_sets.push(file_descriptor_set);
+ self
+ }
+
+ /// Registers a byte slice containing an encoded `prost_types::FileDescriptorSet` with
+ /// the gRPC Reflection Service builder.
+ pub fn register_encoded_file_descriptor_set(
+ mut self,
+ encoded_file_descriptor_set: &'b [u8],
+ ) -> Self {
+ self.encoded_file_descriptor_sets
+ .push(encoded_file_descriptor_set);
+ self
+ }
+
+ /// Serve the gRPC Reflection Service descriptor via the Reflection Service. This is enabled
+ /// by default - set `include` to false to disable.
+ pub fn include_reflection_service(mut self, include: bool) -> Self {
+ self.include_reflection_service = include;
+ self
+ }
+
+ /// Advertise a fully-qualified gRPC service name.
+ ///
+ /// If not called, then all services present in the registered file descriptor sets
+ /// will be advertised.
+ pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
+ self.use_all_service_names = false;
+ self.service_names.push(name.into());
+ self
+ }
+
+ /// Build a v1 gRPC Reflection Service to be served via Tonic.
+ pub fn build_v1(
+ mut self,
+ ) -> Result<v1::ServerReflectionServer<impl v1::ServerReflection>, Error> {
+ if self.include_reflection_service {
+ self = self.register_encoded_file_descriptor_set(crate::pb::v1::FILE_DESCRIPTOR_SET);
+ }
+
+ Ok(v1::ServerReflectionServer::new(
+ v1::ReflectionService::from(ReflectionServiceState::new(
+ self.service_names,
+ self.encoded_file_descriptor_sets,
+ self.file_descriptor_sets,
+ self.use_all_service_names,
+ )?),
+ ))
+ }
+
+ /// Build a v1alpha gRPC Reflection Service to be served via Tonic.
+ pub fn build_v1alpha(
+ mut self,
+ ) -> Result<v1alpha::ServerReflectionServer<impl v1alpha::ServerReflection>, Error> {
+ if self.include_reflection_service {
+ self =
+ self.register_encoded_file_descriptor_set(crate::pb::v1alpha::FILE_DESCRIPTOR_SET);
+ }
+
+ Ok(v1alpha::ServerReflectionServer::new(
+ v1alpha::ReflectionService::from(ReflectionServiceState::new(
+ self.service_names,
+ self.encoded_file_descriptor_sets,
+ self.file_descriptor_sets,
+ self.use_all_service_names,
+ )?),
+ ))
+ }
+}
+
+#[derive(Debug)]
+struct ReflectionServiceState {
+ service_names: Vec<String>,
+ files: HashMap<String, Arc<FileDescriptorProto>>,
+ symbols: HashMap<String, Arc<FileDescriptorProto>>,
+}
+
+impl ReflectionServiceState {
+ fn new(
+ service_names: Vec<String>,
+ encoded_file_descriptor_sets: Vec<&[u8]>,
+ mut file_descriptor_sets: Vec<FileDescriptorSet>,
+ use_all_service_names: bool,
+ ) -> Result<Self, Error> {
+ for encoded in encoded_file_descriptor_sets {
+ file_descriptor_sets.push(FileDescriptorSet::decode(encoded)?);
+ }
+
+ let mut state = ReflectionServiceState {
+ service_names,
+ files: HashMap::new(),
+ symbols: HashMap::new(),
+ };
+
+ for fds in file_descriptor_sets {
+ for fd in fds.file {
+ let name = match fd.name.clone() {
+ None => {
+ return Err(Error::InvalidFileDescriptorSet("missing name".to_string()));
+ }
+ Some(n) => n,
+ };
+
+ if state.files.contains_key(&name) {
+ continue;
+ }
+
+ let fd = Arc::new(fd);
+ state.files.insert(name, fd.clone());
+ state.process_file(fd, use_all_service_names)?;
+ }
+ }
+
+ Ok(state)
+ }
+
+ fn process_file(
+ &mut self,
+ fd: Arc<FileDescriptorProto>,
+ use_all_service_names: bool,
+ ) -> Result<(), Error> {
+ let prefix = &fd.package.clone().unwrap_or_default();
+
+ for msg in &fd.message_type {
+ self.process_message(fd.clone(), prefix, msg)?;
+ }
+
+ for en in &fd.enum_type {
+ self.process_enum(fd.clone(), prefix, en)?;
+ }
+
+ for service in &fd.service {
+ let service_name = extract_name(prefix, "service", service.name.as_ref())?;
+ if use_all_service_names {
+ self.service_names.push(service_name.clone());
+ }
+ self.symbols.insert(service_name.clone(), fd.clone());
+
+ for method in &service.method {
+ let method_name = extract_name(&service_name, "method", method.name.as_ref())?;
+ self.symbols.insert(method_name, fd.clone());
+ }
+ }
+
+ Ok(())
+ }
+
+ fn process_message(
+ &mut self,
+ fd: Arc<FileDescriptorProto>,
+ prefix: &str,
+ msg: &DescriptorProto,
+ ) -> Result<(), Error> {
+ let message_name = extract_name(prefix, "message", msg.name.as_ref())?;
+ self.symbols.insert(message_name.clone(), fd.clone());
+
+ for nested in &msg.nested_type {
+ self.process_message(fd.clone(), &message_name, nested)?;
+ }
+
+ for en in &msg.enum_type {
+ self.process_enum(fd.clone(), &message_name, en)?;
+ }
+
+ for field in &msg.field {
+ self.process_field(fd.clone(), &message_name, field)?;
+ }
+
+ for oneof in &msg.oneof_decl {
+ let oneof_name = extract_name(&message_name, "oneof", oneof.name.as_ref())?;
+ self.symbols.insert(oneof_name, fd.clone());
+ }
+
+ Ok(())
+ }
+
+ fn process_enum(
+ &mut self,
+ fd: Arc<FileDescriptorProto>,
+ prefix: &str,
+ en: &EnumDescriptorProto,
+ ) -> Result<(), Error> {
+ let enum_name = extract_name(prefix, "enum", en.name.as_ref())?;
+ self.symbols.insert(enum_name.clone(), fd.clone());
+
+ for value in &en.value {
+ let value_name = extract_name(&enum_name, "enum value", value.name.as_ref())?;
+ self.symbols.insert(value_name, fd.clone());
+ }
+
+ Ok(())
+ }
+
+ fn process_field(
+ &mut self,
+ fd: Arc<FileDescriptorProto>,
+ prefix: &str,
+ field: &FieldDescriptorProto,
+ ) -> Result<(), Error> {
+ let field_name = extract_name(prefix, "field", field.name.as_ref())?;
+ self.symbols.insert(field_name, fd);
+ Ok(())
+ }
+
+ fn list_services(&self) -> &[String] {
+ &self.service_names
+ }
+
+ fn symbol_by_name(&self, symbol: &str) -> Result<Vec<u8>, Status> {
+ match self.symbols.get(symbol) {
+ None => Err(Status::not_found(format!("symbol '{symbol}' not found"))),
+ Some(fd) => {
+ let mut encoded_fd = Vec::new();
+ if fd.clone().encode(&mut encoded_fd).is_err() {
+ return Err(Status::internal("encoding error"));
+ };
+
+ Ok(encoded_fd)
+ }
+ }
+ }
+
+ fn file_by_filename(&self, filename: &str) -> Result<Vec<u8>, Status> {
+ match self.files.get(filename) {
+ None => Err(Status::not_found(format!("file '{filename}' not found"))),
+ Some(fd) => {
+ let mut encoded_fd = Vec::new();
+ if fd.clone().encode(&mut encoded_fd).is_err() {
+ return Err(Status::internal("encoding error"));
+ }
+
+ Ok(encoded_fd)
+ }
+ }
+ }
+}
+
+fn extract_name(
+ prefix: &str,
+ name_type: &str,
+ maybe_name: Option<&String>,
+) -> Result<String, Error> {
+ match maybe_name {
+ None => Err(Error::InvalidFileDescriptorSet(format!(
+ "missing {name_type} name"
+ ))),
+ Some(name) => {
+ if prefix.is_empty() {
+ Ok(name.to_string())
+ } else {
+ Ok(format!("{prefix}.{name}"))
+ }
+ }
+ }
+}
+
+/// Represents an error in the construction of a gRPC Reflection Service.
+#[derive(Debug)]
+pub enum Error {
+ /// An error was encountered decoding a `prost_types::FileDescriptorSet` from a buffer.
+ DecodeError(prost::DecodeError),
+ /// An invalid `prost_types::FileDescriptorProto` was encountered.
+ InvalidFileDescriptorSet(String),
+}
+
+impl From<DecodeError> for Error {
+ fn from(e: DecodeError) -> Self {
+ Error::DecodeError(e)
+ }
+}
+
+impl std::error::Error for Error {}
+
+impl Display for Error {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Error::DecodeError(_) => f.write_str("error decoding FileDescriptorSet from buffer"),
+ Error::InvalidFileDescriptorSet(s) => {
+ write!(f, "invalid FileDescriptorSet - {s}")
+ }
+ }
+ }
+}
diff --git a/vendor/tonic-reflection/src/server/v1.rs b/vendor/tonic-reflection/src/server/v1.rs
new file mode 100644
index 00000000..6a5054f9
--- /dev/null
+++ b/vendor/tonic-reflection/src/server/v1.rs
@@ -0,0 +1,138 @@
+use std::{fmt, sync::Arc};
+
+use tokio::sync::mpsc;
+use tokio_stream::{Stream, StreamExt};
+use tonic::{Request, Response, Status, Streaming};
+
+use super::ReflectionServiceState;
+use crate::pb::v1::server_reflection_request::MessageRequest;
+use crate::pb::v1::server_reflection_response::MessageResponse;
+pub use crate::pb::v1::server_reflection_server::{ServerReflection, ServerReflectionServer};
+use crate::pb::v1::{
+ ExtensionNumberResponse, FileDescriptorResponse, ListServiceResponse, ServerReflectionRequest,
+ ServerReflectionResponse, ServiceResponse,
+};
+
+/// An implementation for `ServerReflection`.
+#[derive(Debug)]
+pub struct ReflectionService {
+ state: Arc<ReflectionServiceState>,
+}
+
+#[tonic::async_trait]
+impl ServerReflection for ReflectionService {
+ type ServerReflectionInfoStream = ServerReflectionInfoStream;
+
+ async fn server_reflection_info(
+ &self,
+ req: Request<Streaming<ServerReflectionRequest>>,
+ ) -> Result<Response<Self::ServerReflectionInfoStream>, Status> {
+ let mut req_rx = req.into_inner();
+ let (resp_tx, resp_rx) = mpsc::channel::<Result<ServerReflectionResponse, Status>>(1);
+
+ let state = self.state.clone();
+
+ tokio::spawn(async move {
+ while let Some(req) = req_rx.next().await {
+ let Ok(req) = req else {
+ return;
+ };
+
+ let resp_msg = match req.message_request.clone() {
+ None => Err(Status::invalid_argument("invalid MessageRequest")),
+ Some(msg) => match msg {
+ MessageRequest::FileByFilename(s) => state.file_by_filename(&s).map(|fd| {
+ MessageResponse::FileDescriptorResponse(FileDescriptorResponse {
+ file_descriptor_proto: vec![fd],
+ })
+ }),
+ MessageRequest::FileContainingSymbol(s) => {
+ state.symbol_by_name(&s).map(|fd| {
+ MessageResponse::FileDescriptorResponse(FileDescriptorResponse {
+ file_descriptor_proto: vec![fd],
+ })
+ })
+ }
+ MessageRequest::FileContainingExtension(_) => {
+ Err(Status::not_found("extensions are not supported"))
+ }
+ MessageRequest::AllExtensionNumbersOfType(_) => {
+ // NOTE: Workaround. Some grpc clients (e.g. grpcurl) expect this method not to fail.
+ // https://github.com/hyperium/tonic/issues/1077
+ Ok(MessageResponse::AllExtensionNumbersResponse(
+ ExtensionNumberResponse::default(),
+ ))
+ }
+ MessageRequest::ListServices(_) => {
+ Ok(MessageResponse::ListServicesResponse(ListServiceResponse {
+ service: state
+ .list_services()
+ .iter()
+ .map(|s| ServiceResponse { name: s.clone() })
+ .collect(),
+ }))
+ }
+ },
+ };
+
+ match resp_msg {
+ Ok(resp_msg) => {
+ let resp = ServerReflectionResponse {
+ valid_host: req.host.clone(),
+ original_request: Some(req.clone()),
+ message_response: Some(resp_msg),
+ };
+ resp_tx.send(Ok(resp)).await.expect("send");
+ }
+ Err(status) => {
+ resp_tx.send(Err(status)).await.expect("send");
+ return;
+ }
+ }
+ }
+ });
+
+ Ok(Response::new(ServerReflectionInfoStream::new(resp_rx)))
+ }
+}
+
+impl From<ReflectionServiceState> for ReflectionService {
+ fn from(state: ReflectionServiceState) -> Self {
+ Self {
+ state: Arc::new(state),
+ }
+ }
+}
+
+/// A response stream.
+pub struct ServerReflectionInfoStream {
+ inner: tokio_stream::wrappers::ReceiverStream<Result<ServerReflectionResponse, Status>>,
+}
+
+impl ServerReflectionInfoStream {
+ fn new(resp_rx: mpsc::Receiver<Result<ServerReflectionResponse, Status>>) -> Self {
+ let inner = tokio_stream::wrappers::ReceiverStream::new(resp_rx);
+ Self { inner }
+ }
+}
+
+impl Stream for ServerReflectionInfoStream {
+ type Item = Result<ServerReflectionResponse, Status>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ std::pin::Pin::new(&mut self.inner).poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.inner.size_hint()
+ }
+}
+
+impl fmt::Debug for ServerReflectionInfoStream {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_tuple("ServerReflectionInfoStream").finish()
+ }
+}
diff --git a/vendor/tonic-reflection/src/server/v1alpha.rs b/vendor/tonic-reflection/src/server/v1alpha.rs
new file mode 100644
index 00000000..b21d8d91
--- /dev/null
+++ b/vendor/tonic-reflection/src/server/v1alpha.rs
@@ -0,0 +1,138 @@
+use std::{fmt, sync::Arc};
+
+use tokio::sync::mpsc;
+use tokio_stream::{Stream, StreamExt};
+use tonic::{Request, Response, Status, Streaming};
+
+use super::ReflectionServiceState;
+use crate::pb::v1alpha::server_reflection_request::MessageRequest;
+use crate::pb::v1alpha::server_reflection_response::MessageResponse;
+pub use crate::pb::v1alpha::server_reflection_server::{ServerReflection, ServerReflectionServer};
+use crate::pb::v1alpha::{
+ ExtensionNumberResponse, FileDescriptorResponse, ListServiceResponse, ServerReflectionRequest,
+ ServerReflectionResponse, ServiceResponse,
+};
+
+/// An implementation for `ServerReflection`.
+#[derive(Debug)]
+pub struct ReflectionService {
+ state: Arc<ReflectionServiceState>,
+}
+
+#[tonic::async_trait]
+impl ServerReflection for ReflectionService {
+ type ServerReflectionInfoStream = ServerReflectionInfoStream;
+
+ async fn server_reflection_info(
+ &self,
+ req: Request<Streaming<ServerReflectionRequest>>,
+ ) -> Result<Response<Self::ServerReflectionInfoStream>, Status> {
+ let mut req_rx = req.into_inner();
+ let (resp_tx, resp_rx) = mpsc::channel::<Result<ServerReflectionResponse, Status>>(1);
+
+ let state = self.state.clone();
+
+ tokio::spawn(async move {
+ while let Some(req) = req_rx.next().await {
+ let Ok(req) = req else {
+ return;
+ };
+
+ let resp_msg = match req.message_request.clone() {
+ None => Err(Status::invalid_argument("invalid MessageRequest")),
+ Some(msg) => match msg {
+ MessageRequest::FileByFilename(s) => state.file_by_filename(&s).map(|fd| {
+ MessageResponse::FileDescriptorResponse(FileDescriptorResponse {
+ file_descriptor_proto: vec![fd],
+ })
+ }),
+ MessageRequest::FileContainingSymbol(s) => {
+ state.symbol_by_name(&s).map(|fd| {
+ MessageResponse::FileDescriptorResponse(FileDescriptorResponse {
+ file_descriptor_proto: vec![fd],
+ })
+ })
+ }
+ MessageRequest::FileContainingExtension(_) => {
+ Err(Status::not_found("extensions are not supported"))
+ }
+ MessageRequest::AllExtensionNumbersOfType(_) => {
+ // NOTE: Workaround. Some grpc clients (e.g. grpcurl) expect this method not to fail.
+ // https://github.com/hyperium/tonic/issues/1077
+ Ok(MessageResponse::AllExtensionNumbersResponse(
+ ExtensionNumberResponse::default(),
+ ))
+ }
+ MessageRequest::ListServices(_) => {
+ Ok(MessageResponse::ListServicesResponse(ListServiceResponse {
+ service: state
+ .list_services()
+ .iter()
+ .map(|s| ServiceResponse { name: s.clone() })
+ .collect(),
+ }))
+ }
+ },
+ };
+
+ match resp_msg {
+ Ok(resp_msg) => {
+ let resp = ServerReflectionResponse {
+ valid_host: req.host.clone(),
+ original_request: Some(req.clone()),
+ message_response: Some(resp_msg),
+ };
+ resp_tx.send(Ok(resp)).await.expect("send");
+ }
+ Err(status) => {
+ resp_tx.send(Err(status)).await.expect("send");
+ return;
+ }
+ }
+ }
+ });
+
+ Ok(Response::new(ServerReflectionInfoStream::new(resp_rx)))
+ }
+}
+
+impl From<ReflectionServiceState> for ReflectionService {
+ fn from(state: ReflectionServiceState) -> Self {
+ Self {
+ state: Arc::new(state),
+ }
+ }
+}
+
+/// A response stream.
+pub struct ServerReflectionInfoStream {
+ inner: tokio_stream::wrappers::ReceiverStream<Result<ServerReflectionResponse, Status>>,
+}
+
+impl ServerReflectionInfoStream {
+ fn new(resp_rx: mpsc::Receiver<Result<ServerReflectionResponse, Status>>) -> Self {
+ let inner = tokio_stream::wrappers::ReceiverStream::new(resp_rx);
+ Self { inner }
+ }
+}
+
+impl Stream for ServerReflectionInfoStream {
+ type Item = Result<ServerReflectionResponse, Status>;
+
+ fn poll_next(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Option<Self::Item>> {
+ std::pin::Pin::new(&mut self.inner).poll_next(cx)
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.inner.size_hint()
+ }
+}
+
+impl fmt::Debug for ServerReflectionInfoStream {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_tuple("ServerReflectionInfoStream").finish()
+ }
+}