Enable streaming UI updates for chat responses
This commit is contained in:
@@ -87,15 +87,174 @@ func (c *Client) CreateChatCompletion(ctx context.Context, req ChatCompletionReq
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
||||
if req.Stream {
|
||||
return decodeStream(resp.Body)
|
||||
}
|
||||
return decodeSuccess(resp.Body)
|
||||
}
|
||||
|
||||
return nil, decodeError(resp.Body, resp.StatusCode)
|
||||
}
|
||||
|
||||
// StreamChatCompletion issues a streaming chat completion request and invokes handler for each chunk.
|
||||
func (c *Client) StreamChatCompletion(ctx context.Context, req ChatCompletionRequest, handler ChatCompletionStreamHandler) (*ChatCompletionResponse, error) {
|
||||
if c == nil {
|
||||
return nil, errors.New("client is nil")
|
||||
}
|
||||
|
||||
req.Stream = true
|
||||
payload, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encode request: %w", err)
|
||||
}
|
||||
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/chat/completions", bytes.NewReader(payload))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
httpReq.Header.Set("Authorization", "Bearer "+c.apiKey)
|
||||
httpReq.Header.Set("Accept", "text/event-stream")
|
||||
|
||||
resp, err := c.httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("execute request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return nil, decodeError(resp.Body, resp.StatusCode)
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 1<<20)
|
||||
|
||||
type streamChunk struct {
|
||||
ID string `json:"id"`
|
||||
Object string `json:"object"`
|
||||
Choices []struct {
|
||||
Index int `json:"index"`
|
||||
Message ChatMessage `json:"message"`
|
||||
Delta ChatMessage `json:"delta"`
|
||||
FinishReason string `json:"finish_reason"`
|
||||
} `json:"choices"`
|
||||
Usage Usage `json:"usage"`
|
||||
}
|
||||
|
||||
var aggregated ChatCompletionResponse
|
||||
var builder strings.Builder
|
||||
role := "assistant"
|
||||
finish := ""
|
||||
var usage Usage
|
||||
usageReceived := false
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
if !strings.HasPrefix(line, "data:") {
|
||||
continue
|
||||
}
|
||||
|
||||
payloadLine := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
|
||||
if payloadLine == "" {
|
||||
continue
|
||||
}
|
||||
if payloadLine == "[DONE]" {
|
||||
if handler != nil {
|
||||
if err := handler(ChatCompletionStreamEvent{Done: true}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
var chunk streamChunk
|
||||
if err := json.Unmarshal([]byte(payloadLine), &chunk); err != nil {
|
||||
return nil, fmt.Errorf("decode stream response: %w", err)
|
||||
}
|
||||
|
||||
if aggregated.ID == "" {
|
||||
aggregated.ID = chunk.ID
|
||||
}
|
||||
if aggregated.Object == "" {
|
||||
aggregated.Object = chunk.Object
|
||||
}
|
||||
|
||||
if chunk.Usage != (Usage{}) {
|
||||
usage = chunk.Usage
|
||||
usageReceived = true
|
||||
}
|
||||
|
||||
var chunkText string
|
||||
finishReason := ""
|
||||
if len(chunk.Choices) > 0 {
|
||||
choice := chunk.Choices[0]
|
||||
if choice.Message.Role != "" {
|
||||
role = choice.Message.Role
|
||||
}
|
||||
if choice.Delta.Role != "" {
|
||||
role = choice.Delta.Role
|
||||
}
|
||||
if choice.Delta.Content != "" {
|
||||
chunkText = choice.Delta.Content
|
||||
} else if choice.Message.Content != "" && builder.Len() == 0 {
|
||||
chunkText = choice.Message.Content
|
||||
}
|
||||
if choice.Message.Content != "" && builder.Len() == 0 && chunkText == "" {
|
||||
chunkText = choice.Message.Content
|
||||
}
|
||||
if choice.FinishReason != "" {
|
||||
finishReason = choice.FinishReason
|
||||
finish = choice.FinishReason
|
||||
}
|
||||
}
|
||||
|
||||
if chunkText != "" {
|
||||
builder.WriteString(chunkText)
|
||||
}
|
||||
|
||||
if handler != nil {
|
||||
event := ChatCompletionStreamEvent{
|
||||
ID: chunk.ID,
|
||||
Role: role,
|
||||
Content: chunkText,
|
||||
FinishReason: finishReason,
|
||||
}
|
||||
if usageReceived {
|
||||
u := usage
|
||||
event.Usage = &u
|
||||
}
|
||||
if err := handler(event); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, fmt.Errorf("read stream: %w", err)
|
||||
}
|
||||
|
||||
content := strings.TrimSpace(builder.String())
|
||||
if content == "" {
|
||||
return nil, errors.New("stream response contained no content")
|
||||
}
|
||||
|
||||
aggregated.Choices = []ChatCompletionChoice{{
|
||||
Index: 0,
|
||||
Message: ChatMessage{
|
||||
Role: role,
|
||||
Content: content,
|
||||
},
|
||||
FinishReason: finish,
|
||||
}}
|
||||
if usageReceived {
|
||||
aggregated.Usage = usage
|
||||
}
|
||||
|
||||
return &aggregated, nil
|
||||
}
|
||||
|
||||
func decodeSuccess(r io.Reader) (*ChatCompletionResponse, error) {
|
||||
data, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
@@ -118,100 +277,6 @@ func decodeSuccess(r io.Reader) (*ChatCompletionResponse, error) {
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func decodeStream(r io.Reader) (*ChatCompletionResponse, error) {
|
||||
scanner := bufio.NewScanner(r)
|
||||
var payloads []json.RawMessage
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if strings.HasPrefix(line, "data: ") {
|
||||
payload := strings.TrimPrefix(line, "data: ")
|
||||
if payload == "[DONE]" {
|
||||
break
|
||||
}
|
||||
payloads = append(payloads, json.RawMessage(payload))
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
return nil, fmt.Errorf("read stream: %w", err)
|
||||
}
|
||||
|
||||
if len(payloads) == 0 {
|
||||
return nil, errors.New("empty stream response")
|
||||
}
|
||||
|
||||
type streamChunk struct {
|
||||
ID string `json:"id"`
|
||||
Object string `json:"object"`
|
||||
Choices []struct {
|
||||
Index int `json:"index"`
|
||||
Message ChatMessage `json:"message"`
|
||||
Delta ChatMessage `json:"delta"`
|
||||
FinishReason string `json:"finish_reason"`
|
||||
} `json:"choices"`
|
||||
Usage Usage `json:"usage"`
|
||||
}
|
||||
|
||||
var aggregated ChatCompletionResponse
|
||||
var builder strings.Builder
|
||||
role := "assistant"
|
||||
finish := ""
|
||||
|
||||
for _, payload := range payloads {
|
||||
var chunk streamChunk
|
||||
if err := json.Unmarshal(payload, &chunk); err != nil {
|
||||
return nil, fmt.Errorf("decode stream response: %w", err)
|
||||
}
|
||||
if aggregated.ID == "" {
|
||||
aggregated.ID = chunk.ID
|
||||
}
|
||||
if aggregated.Object == "" {
|
||||
aggregated.Object = chunk.Object
|
||||
}
|
||||
aggregated.Usage.PromptTokens += chunk.Usage.PromptTokens
|
||||
aggregated.Usage.CompletionTokens += chunk.Usage.CompletionTokens
|
||||
aggregated.Usage.TotalTokens += chunk.Usage.TotalTokens
|
||||
|
||||
if len(chunk.Choices) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
choice := chunk.Choices[0]
|
||||
if choice.Message.Role != "" {
|
||||
role = choice.Message.Role
|
||||
}
|
||||
if choice.Delta.Role != "" {
|
||||
role = choice.Delta.Role
|
||||
}
|
||||
if choice.Message.Content != "" {
|
||||
builder.WriteString(choice.Message.Content)
|
||||
}
|
||||
if choice.Delta.Content != "" {
|
||||
builder.WriteString(choice.Delta.Content)
|
||||
}
|
||||
if choice.FinishReason != "" {
|
||||
finish = choice.FinishReason
|
||||
}
|
||||
}
|
||||
|
||||
content := strings.TrimSpace(builder.String())
|
||||
if content == "" {
|
||||
return nil, errors.New("stream response contained no content")
|
||||
}
|
||||
|
||||
aggregated.Choices = []ChatCompletionChoice{{
|
||||
Index: 0,
|
||||
FinishReason: finish,
|
||||
Message: ChatMessage{
|
||||
Role: role,
|
||||
Content: content,
|
||||
},
|
||||
}}
|
||||
|
||||
return &aggregated, nil
|
||||
}
|
||||
|
||||
func decodeError(r io.Reader, status int) error {
|
||||
var apiErr ErrorResponse
|
||||
if err := json.NewDecoder(r).Decode(&apiErr); err != nil {
|
||||
|
@@ -37,6 +37,19 @@ type ChatCompletionResponse struct {
|
||||
Usage Usage `json:"usage"`
|
||||
}
|
||||
|
||||
// ChatCompletionStreamEvent represents a single chunk in a streaming chat completion.
|
||||
type ChatCompletionStreamEvent struct {
|
||||
ID string `json:"-"`
|
||||
Role string `json:"-"`
|
||||
Content string `json:"-"`
|
||||
FinishReason string `json:"-"`
|
||||
Usage *Usage `json:"-"`
|
||||
Done bool `json:"-"`
|
||||
}
|
||||
|
||||
// ChatCompletionStreamHandler consumes streaming completion events.
|
||||
type ChatCompletionStreamHandler func(ChatCompletionStreamEvent) error
|
||||
|
||||
// APIError captures structured error responses returned by the API.
|
||||
type APIError struct {
|
||||
Message string `json:"message"`
|
||||
|
Reference in New Issue
Block a user