Java调用ChatGPT API并实现流式接收方式(Server-Sent Events,SSE)
目录
- 简介
- OkHttp 流式获取 GPT 响应
- 通过 SSE 流式推送前端
- 后端代码
- 消息实体
- 接口
- 接口实现
- 数据推送给前端
- 前端代码
- 创建sseClient.js
- vue3代码
- 优化后端代码
- 踩坑
- 总结
简介
用过 ChatGPT 的伙伴应该想过自己通过调用ChatGPT官网提供的接口来实现一个自己的问答机器人,但是在调用的时候发现,请求总是以传统的HTTP请求/响应模式进行,这意味着我们没发送一个请求后需要等待 ChatGPT 服务器返回完整的响应。这种方式在生成文本时并不不是我们理想的,因为用户体验不够流畅。
为了提供更好的用户体验,我们可以使用Server-Sent Events(SSE)技术来实现流式接收。这样,当ChatGPT 服务器可以在生成响应的同时逐步将内容推送给我们,我们在通过 SSE 流式推送到前端页面,让用户能够实时看到生成的内容。我将详细介绍如何在Java中实现这一功能。
OkHttp 流式获取 GPT 响应
其实市面上已经有很多现成的框架支持,但我们这里使用 okHttp 这个轻量级的HTTP客户端库来实现。
需要先引用相关maven:
<dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> </dependency> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp-sse</artifactId> </dehttp://www.devze.compendency>
构建请求体,必须加上参数 stream
值为true
//构建发送内容 String messageStr = StrUtil.format(prompt, params); // 创建一个Message对象,该对象表示一个消息,并设置其属性 Message message = new Message(Message.Role.USER.getRole(), messageStr); // 创建一个ChatCompletion对象,表示聊天完成请求,并将刚创建的消息添加到其中 ChatCompletionRequest request = ChatCompletionRequest.builder() .model(ChatCompletionRequest.Model.GPT_3_5_TURBO.getName()) .messages(Arrays.asList(message)) .stream(true) .build();
// 定义see接口 Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions") .header("Authorization","xxx") .post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString())) .build(); OkHttpClient okHttpClient = new OkHttpClient.Builder() .connectTimeout(10, TimeUnit.MINUTES) .readTimeout(10, TimeUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天 .build(); // 实例化EventSource,注册EventSource监听器 RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() { @Override public void onOpen(EventSource eventSource, Response response) { 编程客栈 log.info("onOpen"); } @SneakyThrows @Override public void onEvent(EventSource eventSource, String id, String type, String data) { // log.info("onEvent"); // 在实际应用中,你可以在这里将数据推送给前端 log.info(data);//请求到的数据 } @Override public void onClosed(EventSource eventSource) { log.info("onClosed"); // emitter.complete(); } @Override public void onFailure(EventSource eventSource, Throwable t, Response response) { log.error("onFailure 出现异常,response={}", response, t);//这边可以监听并重新打开 // emitter.complete(); } }); realEventSource.connect(okHttpClient);//真正开始请求的一步
通过 SSE 流式推送前端
sse(Server Sent Event),直译为服务器发送事件,顾名思义,也就是客户端可以获取到服务器发送的事件
我们常见的 http 交互方式是客户端发起请求,服务端响应,然后一次请求完毕;但是在 sse 的场景下,客户端发起请求,连接一直保持,服务端有数据就可以返回数据给客户端,这个返回可以是多次间隔的方式
原理是先建立链接,然后不断发消息就可以
我们利用 springboot
封装的 SseEmitter
来完成推送,需要用到以下依赖:
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.7.16</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency>
后端代码
消息实体
其中客户端 ID 是每个 SSE 链接的唯一标识,拿到 ID 可以精准的给唯一的用户推送消息,消息通过字符串的方式进行传递
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * 消息体 */ @Data @AllArgsConstructor @NoArgsConstructor public class MessageVo { /** * 客户端id */ private String clientId; /** * 传输数据体(json) */ private String data; }
接口
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; public interface SseEmitterService { /** * 创建连接 * * @param clientId 客户端ID */ SseEmitter createConnect(String clientId); /** * 根据客户端id获取SseEmitter对象 * * @param clientId 客户端ID */ SseEmitter getSseEmitterByClientId(String clientId); /** * 发送消息给所有客户端 * * @param msg 消息内容 */ void sendMessageToAllClient(String msg); /** * 给指定客户端发送消息 * * @param clientId 客户端ID * @param msg 消息内容 */ void sendMessageToOneClient(String clientId, String msg); /** * 关闭连接 * * @param clientId 客户端ID */ void closeConnect(String clientId); }
接口实现
@Slf4j @Service public class SseEmitterServiceImpl implements SseEmitterService { /** * 容器,保存连接,用于输出返回 ;可使用其他方法实现 */ private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>(); /** * 根据客户端id获取SseEmitter对象 * * @param clientId 客户端ID */ @Override public SseEmitter getSseEmitterByClientId(String clientId) { return sseCache.get(clientId); } /** * 创建连接 * * @param clientId 客户端ID */ @Override public SseEmitter createConnect(String clientId) { // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException SseEmitter sseEmitter = new SseEmitter(0L); // 是否需要给客户端推送ID if (StrUtil.isBlank(clientId)) { clientId = IdUtil.simpleUUID(); } // 注册回调 sseEmitter.onCompletion(completionCallBack(clientId)); // 长链接完成后回调接口(即关闭连接时调用) sseEmitter.onTimeout(timeoutCallBack(clientId)); // 连接超时回调 sseEmitter.onError(errorCallBack(clientId)); // 推送消息异常时,回调方法 sseCache.put(clientId, sseEmitter); log.info("创建新的sse连接,当前用户:{} 累计用户:{}", clientId, sseCache.size()); try { // 注册成功返回用户信息 sseEmitter.send(SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_CREATED)).data(clientId, MediaType.APPLICATION_JSON)); } catch (IOException e) { log.error("创建长链接异常,客户端ID:{} 异常信息:{}", clientId, e.getMessage()); } return sseEmitter; } /** * 发送消息给所有客户端 * * @param msg 消息内容 */ @Override public void sendMessageToAllClient(String msg) { if (MapUtil.isEmpty(sseCache)) { return; } // 判断发送的消息是否为空 for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) { MessageVo messageVo = new MessageVo(); messageVo.setClientId(entry.getKey()); messageVo.setData(msg); sendMsgToClientByClientId(entry.getKey(), messageVo, entry.getValue()); } } /** * 给指定客户端发送消息 * * @param clientId 客户端ID * @param msg 消息内容 */ @Override public void sendMessageToOneClient(String clientId, String msg) { MessageVo messageVo = new MessageVo(clientId, msg); sendMsgToClientByClientId(clientId, messageVo, sseCache.get(clientId)); } /** * 关闭连接 * * @param clientId 客户端ID */ @Override public void closeConnect(String clientId) { SseEmitter sseEmitter = sseCache.get(clientId); if (sseEmitter != null) { sseEmitter.complete(); removeUser(clientId); } } /** * 推送消息到客户端 * 此处做了推送失败后,重试推送机制,可根据自己业务进行修改 * * @param clientId 客户端ID * @param messageVo 推送信息,此处结合具体业务,定义自己的返回值即可 **/ private void sendMsgToClientByClientId(String clientId, MessageVo messageVo, SseEmitter sseEmitter) { if (sseEmitter == null) { log.error("推送消息失败:客户端{}未创建长链接,失败消息:{}", clientId, messageVo.toString()); return; } SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(String.valueOf(HttpStatus.HTTP_OK)) .data(messageVo, MediaType.APPLICATION_JSON); try { sseEmitter.send(sendData); } catch (IOException e) { // 推送消息失败,记录错误日志,进行重推 log.error("推送消息失败:{},尝试进行重推", messageVo.toString()); boolean isSuccess = true; // 推送消息失败后,每隔10s推送一次,推送5次 for (int i = 0; i < 5; i++) { try { Thread.sleep(10000); sseEmitter = sseCache.get(clientId); if (sseEmitter == null) { log.error("{}的第{}次消息重推失败,未创建长链接", clientId, i + 1); continue; } sseEmitter.send(sendData); } catch (Exception ex) { log.error("{}的第{}次消息重推失败", clientId, i + 1, ex); continue; } log.info("{}的第{}次消息重推成功,{}", clientId, i + 1, messageVo.toString()); return; } } } /** * 长链接完成后回调接口(即关闭连接时调用) * * @param clientId 客户端ID **/ private Runnable completionCallBack(String clientId) { return () -> { log.info("结束连接:{}", clientId); removeUser(clientjavascriptId); }; } /** * 连接超时时调用 * * @param clientId 客户端ID **/ private Runnable timeoutCallBack(String clientId) { return () -> { log.info("连接超时:{}", clientId); removeUser(clientId); }; } /** * 推送消息异常时,回调方法 * * @param clientId 客户端ID **/ private Consumer<Throwable> errorCallBack(String clientId) { return throwable -> { log.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId); // 推送消息失败后,每隔10s推送一次,推送5次 for (int i = 0; i < 5; i++) { try { Thread.sleep(10000); SseEmitter sseEmitter = sseCache.get(clientId); if (sseEmitter == null) { log.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId); continue; } sseEmitter.send("失败后重新推送"); } catch (Exception e) { e.printStackTrace(); } } }; } /** * 移除用户连接 * * @param clientId 客户端ID **/ private void removeUser(String clientId) { sseCache.remove(clientId); log.inf编程客栈o("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId); } }
数据推送给前端
在 onEvent 回调中添加代码,每接收到消息后就推送到前端
// 定义see接口 Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions") .header("Authorization","xxx") .post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString())) .build(); OkHttpClient okHttpClient = new OkHttpClient.Builder() .connectTimeout(10, TimeUnit.MINUTES) .readTimeout(10, TimeUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天 .build(); // 实例化EventSource,注册EventSource监听器 RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() { @Override public void onOpen(EventSource eventSource, Response response) { log.info("onOpen"); } @Override public void onEvent(EventSource eventSource, String id, String type, String data) { if ("[DONE]".equals(data)) { System.out.println("收到 [DONE] 信号"); return; } ChatCompletionResp chatCompletionResp = JSON.parseobject(data, ChatCompletionResp.class); // 获得生成的文章内容 if (CollUtil.isEmpty(chatCompletionResp.getChoices())){ return; } Message delta = chatCompletionResp.getChoices().get(0).getDelta(); if (delta == null){ return; } sseEmitterService.sendMessageToOneClient(clientId , delta); log.info(data);//请求到的数据 } @Override public void onClosed(EventSource eventSource) { log.info("onClosed"); // emitter.complete(); } @Override public void onFailure(EventSource eventSource, Throwable t, Response response) { log.error("onFailure 出现异常,response={}", response, t);//这边可以监听并重新打开 // emitter.complete(); } }); realEventSource.connect(okHttpClient);//真正开始请求的一步
前端代码
由于 EventSource 不允许直接配置请求头,普通的 EventSource 如果需要携带token请求,那就需要引入一个插件
安装 EventSourcePolyfill
你可以通过npm
安装 event-source-polyfill
:
npm install event-source-polyfill
引入 EventSourcePolyfill 后,它会自动替换浏览器中的原生 EventSource,其用法与原生的 API 一致。你可以像使用 EventSource 一样使用它:
创建sseClient.js
封装一下, sse 最佳实践,
// utils/sseClient.js import { EventSourcePolyfill } from 'event-source-polyfill' import { baseURL } from '../config'; // 封装一个创建 SSE 连接的方法 export function newEventSource({ clientId = '', headers = {}, onMessage, onError, onOpen }) { const token = sessionStorage.getItem('token') || '' const es = new EventSourcePolyfill(baseURL + 'p/sse/createConnect?clientId=' + clientId , { headers: { 'Authorization': `Bearer ${token}` ...headers }, heartbeatTimeout: 60 * 1000, // 心跳超时(可选) }) es.onopen = (event) => { console.log('SSE 连接已开启') onOpen && onOpen(event) } es.onmessage = (event) => { //前端:在接收到结束标识后立即销毁 if (event.data === '[DONE]') { console.log('SSE 连接已关闭') es.close() } onMessage && onMessage(event) } es.onerror = (event) => { console.error('SSE 错误:', event) onError && onError(event) es.close() // 出错时自动关闭 } return es // 返回实例,方便外部主动关闭 }
vue3代码
import { newEventSource } from '@/utils/sseClient.js' const createSseConnection = () => { return newEventSource({ clientId: 'xxx', onMessage: (event) => { console.log('Received SSE message:', event.data); } }); };
优化后端代码
按需建立连接并及时关闭 是非常关键的实践策略,每一个 SseEmitter 在服务端都是一个线程或者任务挂起的状态,太多不关闭会导致资源消耗(线程、连接、内存等);
如果每个用户长时间挂一个 SSE,不及时关闭,可能造成内存泄露或线程池耗尽,所以我们优化一下后端代码,在完成输出后及时关闭连接.
在关闭和异常的回调方法中添加:
sseEmitterService.sendMessageToOneClient(clientId, "[DONE]"); sseEmitterService.closeConnect(clientId);
修改后:
// 定义see接口 Request request = new Request.Builder().url("https://api.openai.com/v1/chat/completions") .header("Authorization","xxx") .post(okhttp3.RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"),param.toJSONString())) .build(); OkHttpClient okHttpClient = new OkHttpClient.Builder() .connectTimeout(10, TimeUnit.MINUTES) .readTimeout(10, TimeWhcupUnit.MINUTES)//这边需要将超时显示设置长一点,不然刚连上就断开,之前以为调用方式错误被坑了半天 .build(); // 实例化EventSource,注册EventSource监听器 RealEventSource realEventSource = new RealEventSource(request, new EventSourceListener() { @Override public void onOpen(EventSource eventSource, Response response) { log.info("onOpen"); } @SneakyThrows @Override public void onEvent(EventSource eventSource, String id, String type, String data) { // log.info("onEvent"); // 在实际应用中,你可以在这里将数据推送给前端 log.info(data);//请求到的数据 } @Override public void onClosed(EventSource eventSource) { log.info("onClosed"); sseEmitterService.sendMessageToOneClient(clientId, "[DONE]"); sseEmitterService.closeConnect(clientId); // emitter.complete(); } @Override public void onFailure(EventSource eventSource, Throwable t, Response response) { log.error("onFailure 出现异常,response={}", response, t);//这边可以监听并重新打开 sseEmitterService.sendMessageToOneClient(clientId, "[DONE]"); sseEmitterService.closeConnect(clientId); // emitter.complete(); } }); realEventSource.connect(okHttpClient);//真正开始请求的一步
输出效果如下:
踩坑
sseEmitterService.sendMessageToOneClient(clientId, "[DONE]");
使用该方法发送纯文本时,里面不能有任何回车符 \n,因为发送到前端后换行符可能被当作字段结束符处理,最好封装对象然后转JSON字符串发送
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论