Sistine Starter

AI 对话功能

基于火山引擎豆包 API 的流式对话实现指南

AI 对话功能

Sistine Starter 集成了火山引擎豆包 API 的对话功能,支持流式响应,提供流畅的用户体验。本文档详细介绍对话系统的架构、实现细节和集成方法。

功能概览

核心特性

  • 流式响应: 使用 Server-Sent Events (SSE) 实时推送 AI 回复
  • 会话管理: 完整的对话历史存储和上下文管理
  • 积分扣费: 每次对话消耗 10 积分,事务性扣费确保数据一致性
  • 上下文支持: 自动加载最近 20 条消息作为上下文
  • 错误处理: 完善的错误恢复和用户提示

技术架构

用户发送消息

身份认证 (Better Auth)

检查积分余额 (10 积分)

扣除积分 + 记录账本

创建/更新会话记录

调用火山引擎 API (流式)

实时推送回复内容 (SSE)

保存助手回复到数据库

更新会话统计信息

火山引擎配置

模型选择

当前使用的模型是 doubao-1-5-thinking-pro-250415,这是火山引擎豆包系列的高级思维模型。

模型特性:

  • 支持复杂推理任务
  • 上下文长度: 最大 32K tokens
  • 输出质量高,逻辑连贯性强

配置位置: lib/volcano-engine/chat.ts:19chat.ts:54

const model = volcanoEngineConfig.textModel || 'doubao-1-5-thinking-pro-250415';

API 参数配置

默认参数 (可在调用时自定义):

参数默认值说明
temperature0.7控制回复的随机性 (0-1,越高越随机)
top_p0.95核采样参数 (0-1,控制输出多样性)
max_tokens2048单次回复的最大 token 数
streamtrue是否启用流式响应

调整参数示例:

// lib/volcano-engine/chat.ts
const request: ChatRequest = {
  model,
  messages,
  stream: true,
  temperature: 0.8,    // 提高随机性
  top_p: 0.9,         // 减少多样性
  max_tokens: 4096,   // 增加最大长度
};

数据库架构

会话表 (chatSession)

存储每个对话会话的基本信息。建表语句请参考 数据库 文档中的 chatSession 表定义。

关键字段说明:

字段类型说明
titleTEXT会话标题,自动截取首条消息的前 100 个字符
modelTEXT记录使用的模型,便于后续分析和计费
totalMessagesIntINTEGER包含用户消息和助手回复的总数
totalCreditsUsedINTEGER该会话累计消耗的积分 (仅统计用户消息消耗)

消息表 (chatMessage)

存储会话中的每条消息。建表语句请参考 数据库 文档中的 chatMessage 表定义。

关键字段说明:

字段类型说明
roleTEXT'user' (用户消息) 或 'assistant' (AI 回复)
creditsUsedINTEGER用户消息为 10,助手回复为 0 (积分在用户发送时扣除)

流式响应实现

API 端点

路由: POST /api/chat/stream

实现文件: app/api/chat/stream/route.ts

请求格式

// POST /api/chat/stream
{
  "message": "你好,请介绍一下自己",
  "sessionId": "uuid-xxx-xxx" // 可选,不提供则创建新会话
}

参数说明:

参数类型必需说明
messagestring用户输入的消息内容
sessionIdstring会话 ID。不提供则创建新会话

响应格式 (SSE 流)

响应采用 Server-Sent Events (SSE) 格式,分为以下几种事件类型:

1. Metadata 事件 (会话信息)

data: {
  "type": "metadata",
  "sessionId": "uuid-xxx-xxx",
  "remainingCredits": 290
}

字段说明:

  • sessionId: 当前会话 ID (新会话或传入的会话 ID)
  • remainingCredits: 扣费后用户剩余积分

2. Content 事件 (内容流)

data: {
  "type": "content",
  "content": "你好"
}

data: {
  "type": "content",
  "content": "!我是"
}

data: {
  "type": "content",
  "content": " AI 助手"
}

字段说明:

  • content: 增量内容片段,前端需要累积拼接

3. Done 事件 (完成信号)

data: {
  "type": "done"
}

4. Error 事件 (错误处理)

data: {
  "type": "error",
  "error": "Failed to process chat"
}

核心实现逻辑

步骤 1: 身份认证与权限检查

// 1. 验证用户登录状态
const session = await auth.api.getSession({ headers: req.headers });
if (!session?.session?.userId) {
  return new Response(JSON.stringify({ error: "Unauthorized" }), {
    status: 401,
    headers: { 'Content-Type': 'application/json' }
  });
}

const userId = session.session.userId;

步骤 2: 解析请求并验证

// 2. 解析请求体
const { message, sessionId } = await req.json();

if (!message) {
  return new Response(JSON.stringify({ error: "Message is required" }), {
    status: 400,
    headers: { 'Content-Type': 'application/json' }
  });
}

步骤 3: 检查积分余额

// 3. 检查用户是否有足够积分 (10 积分)
const hasCredits = await canUserChat(userId);
if (!hasCredits) {
  return new Response(JSON.stringify({
    error: "Insufficient credits",
    remainingCredits: 0
  }), {
    status: 402,
    headers: { 'Content-Type': 'application/json' }
  });
}

注意: HTTP 状态码 402 Payment Required 用于表示积分不足。

步骤 4: 获取或创建会话

// 4. 获取或创建聊天会话
let chatSessionId = sessionId;
if (!chatSessionId) {
  // 创建新会话
  chatSessionId = randomUUID();
  await db.insert(chatSession).values({
    id: chatSessionId,
    userId,
    title: message.substring(0, 100),  // 截取消息前 100 字符作为标题
    model: "doubao-1-5-thinking-pro-250415",
  });
}

步骤 5: 扣除积分

// 5. 扣除积分 (事务性操作)
const deductResult = await deductCredits(
  userId,
  10,              // 消耗 10 积分
  "chat_usage",    // 原因标识
  chatSessionId    // 关联会话 ID
);

if (!deductResult.success) {
  return new Response(JSON.stringify({
    error: deductResult.error || "Failed to deduct credits",
    remainingCredits: deductResult.remainingCredits
  }), {
    status: 402,
    headers: { 'Content-Type': 'application/json' }
  });
}

步骤 6: 保存用户消息

// 6. 保存用户消息到数据库
const userMessageId = randomUUID();
await db.insert(chatMessage).values({
  id: userMessageId,
  sessionId: chatSessionId,
  role: "user",
  content: message,
  creditsUsed: 10,  // 记录消耗的积分
});

步骤 7: 加载上下文历史

// 7. 获取最近 20 条消息作为上下文
const messages = await db
  .select()
  .from(chatMessage)
  .where(eq(chatMessage.sessionId, chatSessionId))
  .orderBy(chatMessage.createdAt)
  .limit(20);

// 8. 构建 API 请求的消息数组
const chatMessages: ChatMessage[] = messages
  .filter(m => m.id !== userMessageId)  // 排除刚插入的消息
  .map(m => ({
    role: m.role as 'user' | 'assistant',
    content: m.content,
  }));

// 9. 添加当前消息
chatMessages.push({
  role: 'user',
  content: message,
});

步骤 8: 创建 SSE 流

// 10. 创建可读流用于 SSE 响应
const encoder = new TextEncoder();
const stream = new ReadableStream({
  async start(controller) {
    try {
      // 发送初始元数据
      controller.enqueue(encoder.encode(`data: ${JSON.stringify({
        type: 'metadata',
        sessionId: chatSessionId,
        remainingCredits: deductResult.remainingCredits
      })}\n\n`));

      // 调用火山引擎流式 API
      const volcanoStream = await volcanoEngine.createChatStream(chatMessages, {
        temperature: 0.7,
        top_p: 0.95,
        max_tokens: 2048,
      });

      const reader = volcanoStream.getReader();
      const decoder = new TextDecoder();
      let fullText = "";
      let buffer = "";

      // 逐块读取并解析
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        const lines = buffer.split('\n');
        buffer = lines.pop() || '';

        for (const line of lines) {
          const parsed = parseSSEChunk(line);
          if (parsed) {
            if (parsed.done) break;

            const content = parsed.choices?.[0]?.delta?.content || '';
            if (content) {
              fullText += content;
              // 推送内容片段
              controller.enqueue(encoder.encode(`data: ${JSON.stringify({
                type: 'content',
                content
              })}\n\n`));
            }
          }
        }
      }

      // 保存完整回复到数据库
      await db.insert(chatMessage).values({
        id: randomUUID(),
        sessionId: chatSessionId,
        role: "assistant",
        content: fullText,
        creditsUsed: 0,  // 助手回复不消耗积分
      });

      // 更新会话统计
      await db.update(chatSession)
        .set({
          totalMessages: messages.length + 2,
          totalCreditsUsed: (messages.filter(m => m.role === "user").length + 1) * 10,
          lastMessageAt: new Date(),
        })
        .where(eq(chatSession.id, chatSessionId));

      // 发送完成信号
      controller.enqueue(encoder.encode(`data: ${JSON.stringify({
        type: 'done'
      })}\n\n`));

      controller.close();

    } catch (error: any) {
      console.error("Stream error:", error);
      controller.enqueue(encoder.encode(`data: ${JSON.stringify({
        type: 'error',
        error: error.message || "Failed to process chat"
      })}\n\n`));
      controller.close();
    }
  },
});

return new Response(stream, {
  headers: {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
  },
});

积分扣费机制

扣费规则

  • 消耗标准: 每次用户发送消息消耗 10 积分
  • 扣费时机: 在调用 AI API 之前扣除,防止积分不足时浪费 API 调用
  • 失败退款: 如果 AI 生成失败,当前实现不会自动退款 (可根据需求添加)

代码实现

import { canUserChat, deductCredits } from "@/lib/credits";

// 1. 检查余额
const hasCredits = await canUserChat(userId);
if (!hasCredits) {
  return new Response("Insufficient credits", { status: 402 });
}

// 2. 扣除积分
const deductResult = await deductCredits(
  userId,
  10,              // 消耗 10 积分
  "chat_usage",    // 原因
  chatSessionId    // 关联 ID
);

if (!deductResult.success) {
  return new Response(deductResult.error, { status: 402 });
}

积分消耗定义

定义位置: lib/credits.ts:6

const CHAT_CREDIT_COST = 10; // 每次对话消耗 10 积分

调整消耗量:

// 修改为 5 积分
const CHAT_CREDIT_COST = 5;

// 或者根据模型动态定价
const CHAT_CREDIT_COST = {
  'doubao-lite': 5,
  'doubao-pro': 10,
  'doubao-thinking-pro': 20,
};

客户端集成示例

前端实现 (React)

import { useState, useEffect } from "react";

export function ChatComponent() {
  const [messages, setMessages] = useState<Array<{ role: string; content: string }>>([]);
  const [input, setInput] = useState("");
  const [sessionId, setSessionId] = useState<string | null>(null);
  const [credits, setCredits] = useState<number>(0);
  const [isLoading, setIsLoading] = useState(false);

  const handleSendMessage = async () => {
    if (!input.trim()) return;

    // 添加用户消息到 UI
    const userMessage = { role: "user", content: input };
    setMessages(prev => [...prev, userMessage]);
    setInput("");
    setIsLoading(true);

    try {
      // 调用流式 API
      const response = await fetch("/api/chat/stream", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({
          message: input,
          sessionId: sessionId,
        }),
      });

      if (!response.ok) {
        const error = await response.json();
        throw new Error(error.error || "Failed to send message");
      }

      // 解析 SSE 流
      const reader = response.body?.getReader();
      const decoder = new TextDecoder();
      let assistantMessage = "";

      // 创建占位消息
      setMessages(prev => [...prev, { role: "assistant", content: "" }]);

      while (true) {
        const { done, value } = await reader!.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split("\n");

        for (const line of lines) {
          if (line.startsWith("data: ")) {
            const data = JSON.parse(line.slice(6));

            if (data.type === "metadata") {
              // 保存会话 ID 和积分
              setSessionId(data.sessionId);
              setCredits(data.remainingCredits);
            } else if (data.type === "content") {
              // 累积内容
              assistantMessage += data.content;
              // 更新最后一条消息
              setMessages(prev => {
                const newMessages = [...prev];
                newMessages[newMessages.length - 1].content = assistantMessage;
                return newMessages;
              });
            } else if (data.type === "done") {
              console.log("Stream completed");
            } else if (data.type === "error") {
              throw new Error(data.error);
            }
          }
        }
      }
    } catch (error: any) {
      console.error("Chat error:", error);
      alert(error.message);
    } finally {
      setIsLoading(false);
    }
  };

  return (
    <div>
      {/* 积分显示 */}
      <div>剩余积分: {credits}</div>

      {/* 消息列表 */}
      <div>
        {messages.map((msg, i) => (
          <div key={i} className={msg.role === "user" ? "user-message" : "assistant-message"}>
            <strong>{msg.role === "user" ? "你" : "AI"}:</strong>
            <p>{msg.content}</p>
          </div>
        ))}
      </div>

      {/* 输入框 */}
      <input
        value={input}
        onChange={(e) => setInput(e.target.value)}
        onKeyPress={(e) => e.key === "Enter" && handleSendMessage()}
        disabled={isLoading}
        placeholder="输入消息..."
      />
      <button onClick={handleSendMessage} disabled={isLoading}>
        {isLoading ? "发送中..." : "发送"}
      </button>
    </div>
  );
}

Fetch API 示例 (纯 JavaScript)

async function sendChatMessage(message, sessionId = null) {
  const response = await fetch('/api/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ message, sessionId }),
  });

  if (!response.ok) {
    const error = await response.json();
    throw new Error(error.error);
  }

  const reader = response.body.getReader();
  const decoder = new TextDecoder();
  let fullText = '';
  let sessionInfo = null;

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value);
    const lines = chunk.split('\n');

    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = JSON.parse(line.slice(6));

        switch (data.type) {
          case 'metadata':
            sessionInfo = data;
            console.log('Session:', data.sessionId);
            console.log('Credits:', data.remainingCredits);
            break;
          case 'content':
            fullText += data.content;
            console.log('Content:', data.content);
            break;
          case 'done':
            console.log('Full response:', fullText);
            break;
          case 'error':
            throw new Error(data.error);
        }
      }
    }
  }

  return { sessionId: sessionInfo.sessionId, text: fullText };
}

// 使用示例
sendChatMessage('你好,请介绍一下自己')
  .then(result => console.log('Response:', result))
  .catch(error => console.error('Error:', error));

错误处理

常见错误码

状态码错误类型说明处理建议
401Unauthorized用户未登录跳转到登录页
402Payment Required积分不足提示用户购买积分
400Bad Request缺少必需参数检查请求格式
500Internal Server Error服务器错误稍后重试或联系支持

错误响应示例

// 积分不足
{
  "error": "Insufficient credits",
  "remainingCredits": 5
}

// 扣费失败
{
  "error": "Failed to deduct credits",
  "remainingCredits": 0
}

// 流式传输错误
data: {
  "type": "error",
  "error": "Failed to process chat"
}

前端错误处理建议

try {
  const response = await fetch('/api/chat/stream', { ... });

  if (response.status === 401) {
    // 未登录
    router.push('/login');
    return;
  }

  if (response.status === 402) {
    // 积分不足
    const error = await response.json();
    alert(`积分不足!当前余额: ${error.remainingCredits}`);
    router.push('/pricing');
    return;
  }

  if (!response.ok) {
    const error = await response.json();
    throw new Error(error.error);
  }

  // 处理流式响应...
} catch (error) {
  console.error('Chat error:', error);
  // 显示用户友好的错误消息
}

高级功能

自定义系统提示词

在消息数组开头添加系统消息:

const chatMessages: ChatMessage[] = [
  {
    role: 'system',
    content: '你是一个专业的代码助手,专注于帮助用户解决编程问题。'
  },
  ...historyMessages,
  {
    role: 'user',
    content: message
  }
];

上下文窗口管理

限制加载的历史消息数量以控制 token 消耗:

// 加载最近 10 条消息 (而不是默认的 20 条)
const messages = await db
  .select()
  .from(chatMessage)
  .where(eq(chatMessage.sessionId, chatSessionId))
  .orderBy(desc(chatMessage.createdAt))
  .limit(10);

多模型支持

根据用户订阅等级选择不同模型:

import { db } from "@/lib/db";
import { user as userTable } from "@/lib/db/schema";
import { eq } from "drizzle-orm";

// 获取用户计划
const users = await db.select().from(userTable).where(eq(userTable.id, userId));
const userPlan = users[0]?.planKey || "free";

// 根据计划选择模型
const modelMap = {
  free: "doubao-lite-4k",
  starter_monthly: "doubao-pro-4k",
  pro_monthly: "doubao-1-5-thinking-pro-250415",
};

const model = modelMap[userPlan] || "doubao-lite-4k";

会话标题自动生成

使用 AI 总结首条消息生成更好的标题:

// 如果消息过长,可以让 AI 生成标题
if (message.length > 100) {
  const titleResponse = await volcanoEngine.createChatCompletion([
    {
      role: 'user',
      content: `请用 10 个字以内总结这段对话的主题: ${message}`
    }
  ]);

  title = titleResponse.choices[0].message.content;
}

性能优化建议

1. 数据库索引

确保在关键字段上创建索引:

-- 会话查询优化
CREATE INDEX idx_chat_session_user_id ON chat_session(user_id);
CREATE INDEX idx_chat_session_updated_at ON chat_session(updated_at DESC);

-- 消息查询优化
CREATE INDEX idx_chat_message_session_id ON chat_message(session_id);
CREATE INDEX idx_chat_message_created_at ON chat_message(created_at);

2. 缓存用户积分

高频查询的积分数据可以使用 Redis 缓存:

import { redis } from "@/lib/redis";

async function getCachedCredits(userId: string): Promise<number> {
  const cached = await redis.get(`user:${userId}:credits`);
  if (cached !== null) return parseInt(cached);

  const credits = await getUserCredits(userId);
  await redis.set(`user:${userId}:credits`, credits, { ex: 60 }); // 缓存 60 秒
  return credits;
}

3. 限流保护

防止用户过快发送请求:

import { Ratelimit } from "@upstash/ratelimit";
import { redis } from "@/lib/redis";

const ratelimit = new Ratelimit({
  redis,
  limiter: Ratelimit.slidingWindow(10, "1 m"), // 每分钟最多 10 次
});

export async function POST(req: NextRequest) {
  const { success } = await ratelimit.limit(userId);
  if (!success) {
    return new Response("Too many requests", { status: 429 });
  }

  // 继续处理...
}

相关文档

总结

AI 对话功能通过以下关键设计实现高质量的用户体验:

  1. 流式响应: 使用 SSE 实时推送内容,避免长时间等待
  2. 完整会话管理: 自动加载上下文,支持多轮对话
  3. 事务性扣费: 确保积分扣除和账本记录的一致性
  4. 错误恢复: 完善的错误处理和用户提示

按照本文档的指导,你可以快速集成和定制对话功能以满足你的业务需求。