Skip to content

[Bugfix]fix error response in pd_router #9505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

Bruce-x-1997
Copy link
Contributor

Motivation

when a streaming request meets error , it should return a streaming response(using create_streaming_response), not a non-stream response

Modifications

I achieve a function call handle_decode_error_response to handle the error response in pd_router
and when it's a streaming req, it could return streaming error response by calling create_streaming_response

Checklist

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary of Changes

Hello @Bruce-x-1997, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request fixes a bug in the pd_router where streaming requests were incorrectly returning non-streaming error responses. The change introduces a new helper function, handle_decode_error_response, to ensure that all error responses, especially for streaming requests, are properly formatted and streamed back to the client.

Highlights

  • Improved Error Handling for Streaming Requests: Ensures that when a streaming request encounters an error, the response is also streamed, providing a consistent user experience.
  • Centralized Error Response Logic: Introduces a new asynchronous function handle_decode_error_response within the PDRouter to encapsulate and standardize how error responses are generated and returned.
  • Refactored Error Response Paths: Replaces duplicated error handling logic in execute_dual_dispatch_internal with calls to the new handle_decode_error_response function, improving code maintainability.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly identifies the need to handle streaming error responses differently from non-streaming ones by introducing the handle_decode_error_response function. This is a great improvement for error handling consistency.

I've found a few critical issues in the implementation that would either prevent the code from compiling or lead to incorrect behavior at runtime. My review includes suggestions to fix a type mismatch in the new function's signature, correct the construction of the JSON error payload to prevent malformed responses, and fix the function calls to ensure proper control flow and avoid move-related compiler errors. Addressing these points will make the new error handling logic robust and correct.

@@ -804,6 +804,58 @@ impl PDRouter {
.await
}

async fn handle_decode_error_response(
res: reqwest::Response,
context: &GenerateContext,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The type GenerateContext is used here, but it seems to be a typo for PDRequestContext. The call sites for this function are inside execute_dual_dispatch_internal, where the context variable is of type PDRequestContext. Using the correct type is necessary for the code to compile.

Suggested change
context: &GenerateContext,
context: &PDRequestContext,

Comment on lines +817 to +835
let error_message = match res.bytes().await {
Ok(error_body) => {
if let Ok(error_json) = serde_json::from_slice::<Value>(&error_body) {
serde_json::to_string(&error_json).unwrap_or_else(|_| {
format!("Decode server error: {}", status)
})
} else {
String::from_utf8_lossy(&error_body).to_string()
}
}
Err(e) => {
format!("Decode server error: {}", e)
}
};

let error_stream = tokio_stream::once(Ok(axum::body::Bytes::from(
format!("data: {{\"error\": {{\"message\": \"{}\", \"status\": {}}}}}\n\n",
error_message, status.as_u16())
)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation for creating the streaming error response can produce invalid JSON. The error_message string is embedded directly into a JSON string using format!, which doesn't escape special characters (like quotes). If error_message contains JSON or quotes, the resulting SSE data will be malformed.

To fix this, it's safer to construct a serde_json::Value and then serialize it. This ensures the final output is always valid JSON. The suggested change also improves the structure by embedding the upstream JSON error as an object in the message field, rather than a string, which provides a more structured error to the client.

Suggested change
let error_message = match res.bytes().await {
Ok(error_body) => {
if let Ok(error_json) = serde_json::from_slice::<Value>(&error_body) {
serde_json::to_string(&error_json).unwrap_or_else(|_| {
format!("Decode server error: {}", status)
})
} else {
String::from_utf8_lossy(&error_body).to_string()
}
}
Err(e) => {
format!("Decode server error: {}", e)
}
};
let error_stream = tokio_stream::once(Ok(axum::body::Bytes::from(
format!("data: {{\"error\": {{\"message\": \"{}\", \"status\": {}}}}}\n\n",
error_message, status.as_u16())
)));
let error_payload = match res.bytes().await {
Ok(error_body) => {
if let Ok(error_json) = serde_json::from_slice::<Value>(&error_body) {
// The error body is valid JSON, embed it directly as the message.
json!({ "message": error_json, "status": status.as_u16() })
} else {
// The error body is not JSON, treat it as a plain string.
json!({ "message": String::from_utf8_lossy(&error_body).to_string(), "status": status.as_u16() })
}
}
Err(e) => {
// Failed to read the error body.
json!({ "message": format!("Decode server error: {}", e), "status": status.as_u16() })
}
};
let sse_data = format!("data: {{"error": {}}}
", serde_json::to_string(&error_payload).unwrap_or_default());
let error_stream = tokio_stream::once(Ok(axum::body::Bytes::from(sse_data)));

.into_response();
}
}
self.handle_decode_error_response(res, context, decode.url())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are two issues with this function call:

  1. A return statement is missing. Without it, the function will continue execution after handling the error, which is not the intended behavior. The original code had a return, and it's still needed here.
  2. The context variable is used later in the function, so it cannot be moved. It must be passed by reference using &context to avoid a compiler error.
Suggested change
self.handle_decode_error_response(res, context, decode.url())
return self.handle_decode_error_response(res, &context, decode.url());

(status, format!("Decode server error: {}", e)).into_response()
}
}
self.handle_decode_error_response(res, context, decode.url())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The context variable is used in the following else if block, so it cannot be moved into the handle_decode_error_response function. You need to pass it by reference (&context) to avoid a compiler error.

Suggested change
self.handle_decode_error_response(res, context, decode.url())
self.handle_decode_error_response(res, &context, decode.url())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant