Java实现Socket服务端与客户端双向通信功能
目录
- 原理
- 代码实现
- 服务端
- 客户端
- 总结
原理
Socket通信是计算机网络中常用的一种通信机制,它是基于TCP/IP协议实现的,提供了两个应用程序之间通过网络进行数据交换的能力。Socket本质上是一种抽象概念,为网络服务提供了一组API接口。
- Socket通信模型
Socket通信模型通常包括客户端和服务器端两部分。 服务器端:负责在特定的端口监听来自客户端的连接请求,当一个请求到达时,服务器会与客户端建立连接,并为客户端提供相应的服务。 客户端:主动向服务器的特定IP地址和端口发起连接请求,连接成功后,客户端可以通过建立的连接向服务器发送请求并接收响应。
- Socket通信过程
Socket通信过程一般包括以下几个步骤:
- 服务器监听:
服务器通过socket()函数创建一个Socket,并通过bind()函数将其绑定到一个IP地址和端口上。然后,服务器调用listen()函数开始监听该端口上的连接请求。
- 客户端请求连接:
客户端也通过socket()函数创建一个Socket,然后调用connect()函数尝试与服务器的指定IP地址和端口建立连接。
- 服务器接受连接:
服务器在接收到客户端的连接请求后,通过accept()函数接受这个连接。如果成功,accept()函数会返回一个新的Socket(通常称为“子Socket”),用于与该客户端进行通信。
数据传输:连接建立成功后,客户端和服务器就可以通过新建立的Socket进行数据传输了。数据传输可以是单向的也可以是双向的。应用程序可以使用send(), write(), recv(), read()等函数进行数据发送和接收操作。
- 断开连接:
当通信结束后,客户端和服务器都可以调用close()函数来关闭自己持有的Socket,从而断开两者之间的连接。 TCP vs UDP 在实际使用中,基于Socket的通信方式主要有两种:基于TCP和基于UDP。 TCP Socket:提供可靠、面向连接、基于字节流的通信方式。适用对数据完整性和顺序有要求的应用场景。 UDP Socket:提供无连接、不保证可靠性、基于消息(数据报)的通信方式。适用于对实时性要求高、容忍部分数据丢失或乱序的应用场景。
代码实现
服务端
服务端主体逻辑:和每个接入的客户端都会使用独立线程建立起长连接,二者之间使用心跳保持联系,使用clientSockets 存储了每个客户端的信息便于和客户端建立起联系。
package com.example.demo2.server.socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.stereotype.Component; import Javax.annotation.PostConstruct; import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.util.Map; import java.util.concurrent.*; /** * @author kim */ @Component public class TcpServer implements DisposableBean { private static final Logger logger = LoggerFactory.getLogger(TcpServer.class); private final SocketServerConfig config; private ServerSocket serverSocket; private ExecutorService executorService; private volatile boolean running = true; // 存储客户端连接 private final Map<String, Socket> clientSockets = new ConcurrentHashMap<>(); public TcpServer(SocketServerConfig config) { this.config = config; } @PostConstruct public void start() throws IOException { executorService = Executors.newFixedThreadPool(config.getMaxThreads()); serverSocket = new ServerSocket(config.getPort()); logger.info("平台socket服务已启动, 监听端口为 {}", config.getPort()); new Thread(this::acceptConnections).start(); } private void acceptConnections() { while (running) { try { Socket clientSocket = serverSocket.accept(); String clientAddress = clientSocket.getInetAddrhttp://www.devze.comess().getHostAddress(); clientSockets.put(clientAddress, clientSocket); executorService.execute(new ClientHandler(clientSocket, clientAddress)); } catch (IOException e) { if (running) { logger.error("Connection accept error", e); } } } } // 用于发送消息到特定客户端 public void sendMessageToClient(String clientAddress, String message) throws IOException { Socket socket = clientSockets.get(clientAddress); if (socket != null && !socket.isClosed()) { PrintWriter out = new PrintWriter(socket.getOutputStream(), true); out.println(message); logger.info("Sent message to {}: {}", clientAddress, message); } else { logger.warn("Client {} is not connected or socket is closed", clientAddress); } } @Override public void destroy() throws Exception { running = false; executorService.shutdown(); for (Socket socket :编程 clientSockets.values()) { if (!socket.isClosed()) { socket.close(); } } if (serverSocket != null && !serverSocket.isClosed()) { serverSocket.close(); } logger.info("TCP Server stopped"); } private class ClientHandler implements Runnable { private final Socket clientSocket; private final String clientAddress; ClientHandler(Socket socket, String address) { this.clientSocket = socket; this.clientAddress = address; } @Override public void run() { try (BufferedReader in = new BufferedReader( new InputStreamReader(clientSocket.getInputStream())); PrintWriter out = new PrintWriter( clientSocket.getOutputStream(), true)) { logger.info("Client connected: {}", clientAddress); String input; while ((input = in.readLine()) != null) { logger.debug("Received: {}", input); out.println(input); logger.info("Client connected: {}", clientAddress); } } catch (IOException e) { logger.warn("Client connection closed: {}", e.getMessage()); } finally { try { clientSockets.remove(clientAddress); clientSocket.close(); } catch (IOException e) { logger.error("Error closing socket", e); } } } } }
配置类
package com.example.demo2.server.socket; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "socket.server") @Data public class SocketServerConfig { private int port; private int maxThreads; // Getters and Setters }
配置文件
server: port: 8080 socket: server: port: 8088 maxThreads: 50
向客户端发送测试信息
@GetMapping("/send") public String sendMessage(String clientAddress) throws IOException { tcpServer.sendMessageToClient("192.168.3.8","77777777777"); return "success"; }
服务端发送日志
客户端
客户端主体逻辑,使用自己设计的心跳机制,监听服务端状态,如果服务端断开连接,则客户端会尝试重新建立联系。
package com.example.demo1.socketclient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Service; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @Service public class TcpClientService implements ApplicationRunner, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(TcpClientServiphpce.class); private final SocketClientConfig config; private Socket socket; private PrintWriter out; private BufferedReader in; private final AtomicBoolean running = new AtomicBoolean(true); private final ExecutorService executor = Executors.newSingleThreadExecutor(); private final MessageListener messageListener; @Autowired public TcpClientService(SocketClientConfig config, MessageListener messageListener) { this.config = config; this.messageListener = messageListener; } @Override public void run(ApplicationArgumejsnts args) throws Exception { initializeConnection(); } @Override public void destroy() throws Exception { running.set(false); closeResources(); executor.shutdown(); } private synchronized void initializeConnection() { new Thread(() -> { while (running.get()) { try { socket = new Socket(); socket.setKeepAlive(true); socket.setSoTimeout(config.getHeartbeatTimeout()); socket.connect(new InetSocketAddress(config.getHost(), config.getPort()), config.getTimeout()); out = new PrintWriter(socket.getOutputStream(), true); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); logger.info("Connected to server {}:{}", config.getHost(), config.getPort()); executor.execute(this::listenToServer); startHeartbeat(); while (!socket.isClosed() && socket.isConnected()) { Thread.sleep(1000); } } catch (Exception e) { logger.warn("Connection error: {}", e.getMessage()); } finally { closeResources(); if (running.get()) { logger.info("Attempting to reconnect in 5 seconds..."); sleepSafely(5000); } } } }, "tcp-client-connector").start(); } private void listenToServer() { try { String response; while (running.get() && !socket.isClosed()) { try { response = in.readLine(); if (response == null) { logger.warn("Server closed connection"); break; // 终止循环,表示连接已关闭 } logger.debug("Received server message: {}", response); messageListener.onMessage(response); } catch (SocketTimeoutException e) { logger.debug("Socket read timeout"); } catch (IOException e) { if (!socket.isClosed()) { logger.warn("Connection lost: {}", e.getMessage()); break; // 终止循环,表示连接已中断 } } } } finally { closeResources(); // 确保资源关闭 } } private void startHeartbeat() { new Thread(() -> { while (running.get() && !socket.isClosed()) { try { sendMessageInternal("HEARTBEAT"); sleepSafely(config.getHeartbeatInterval()); } catch (Exception e) { logger.warn("Heartbeat failed: {}", e.getMessage()); break; } } }, "heartbeat-thread").start(); } public synchronized void sendMessage(String message) throws IOException { if (socket == null || !socket.isConnected()) { throw new IOException("Not connected to server"); } out.println(message); logger.debug("Sent message: {}", message); } private synchronized void sendMessageInternal(String message) { try { if (socket != null && socket.isConnected()) { out.println(message); } } catch (Exception e) { logger.warn("Failed to send heandroidartbeat"); } } private synchronized void closeResources() { try { if (out != null) out.close(); if (in != null) in.close(); if (socket != null) socket.close(); } catch (IOException e) { logger.warn("Error closing resources: {}", e.getMessage()); } } private void sleepSafely(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public interface MessageListener { void onMessage(String message); } }
消息监听:监听服务发送的消息
package com.example.demo1.socketclient; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @Component public class ServerMessageHandler implements TcpClientService.MessageListener { private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class); @Override public void onMessage(String message) { if(StringUtils.isNotEmpty(message)){ if (!message.contains("HEARTBEAT")){ //处理其他逻辑 System.out.println("接收服务端消息成功:"+message); }else{ //心跳消息 System.out.println(message); } } } }
配置类
package com.example.demo1.socketclient; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "socket.client") @Data public class SocketClientConfig { private String host; private int port; private int timeout; private int heartbeatInterval; private int heartbeatTimeout; // Getters and Setters }
发送测试方法
@GetMapping("/send") public ResponseEntity<String> sendMessage() { try { tcpClient.sendMessage("客户端发送信息"); return ResponseEntity.ok("Message sent"); } catch (IOException e) { return ResponseEntity.status(503).body("Server unavailable"); } }
配置文件
socket: client: host: 192.168.3.8 #服务端ip地址 port: 8088 #监听端口 timeout: 5000 heartbeat-interval: 3000 # 心跳间隔30秒 heartbeat-timeout: 6000 # 心跳超时60秒 server: port: 8082
客户端发送信息后,服务端会接收到信息。
总结
以上就是java接入socket通信服务端与客户端的全部代码,二者实现了互相通信,具体的业务场景则需要小伙伴们在此基础上额外的设计逻辑了。
以上就是Java实现Socket服务端与客户端双向通信功能的详细内容,更多关于Java Socket服务端与客户端通信的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论