开发者

Java实现Socket服务端与客户端双向通信功能

目录
  • 原理
  • 代码实现
    • 服务端
    • 客户端
  • 总结

    原理

    Socket通信是计算机网络中常用的一种通信机制,它是基于TCP/IP协议实现的,提供了两个应用程序之间通过网络进行数据交换的能力。Socket本质上是一种抽象概念,为网络服务提供了一组API接口。

    • Socket通信模型

    Socket通信模型通常包括客户端和服务器端两部分。 服务器端:负责在特定的端口监听来自客户端的连接请求,当一个请求到达时,服务器会与客户端建立连接,并为客户端提供相应的服务。 客户端:主动向服务器的特定IP地址和端口发起连接请求,连接成功后,客户端可以通过建立的连接向服务器发送请求并接收响应。

    • Socket通信过程

    Socket通信过程一般包括以下几个步骤:

    • 服务器监听:

    服务器通过socket()函数创建一个Socket,并通过bind()函数将其绑定到一个IP地址和端口上。然后,服务器调用listen()函数开始监听该端口上的连接请求。

    • 客户端请求连接:

    客户端也通过socket()函数创建一个Socket,然后调用connect()函数尝试与服务器的指定IP地址和端口建立连接。

    • 服务器接受连接:

    服务器在接收到客户端的连接请求后,通过accept()函数接受这个连接。如果成功,accept()函数会返回一个新的Socket(通常称为“子Socket”),用于与该客户端进行通信。

    数据传输:连接建立成功后,客户端和服务器就可以通过新建立的Socket进行数据传输了。数据传输可以是单向的也可以是双向的。应用程序可以使用send(), write(), recv(), read()等函数进行数据发送和接收操作。

    • 断开连接:

    当通信结束后,客户端和服务器都可以调用close()函数来关闭自己持有的Socket,从而断开两者之间的连接。 TCP vs UDP 在实际使用中,基于Socket的通信方式主要有两种:基于TCP和基于UDP。 TCP Socket:提供可靠、面向连接、基于字节流的通信方式。适用对数据完整性和顺序有要求的应用场景。 UDP Socket:提供无连接、不保证可靠性、基于消息(数据报)的通信方式。适用于对实时性要求高、容忍部分数据丢失或乱序的应用场景。

    代码实现

    服务端

    服务端主体逻辑:和每个接入的客户端都会使用独立线程建立起长连接,二者之间使用心跳保持联系,使用clientSockets 存储了每个客户端的信息便于和客户端建立起联系。

    package com.example.demo2.server.socket;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.stereotype.Component;
    
    import Javax.annotation.PostConstruct;
    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Map;
    import java.util.concurrent.*;
    
    /**
     * @author kim
     */
    @Component
    public class TcpServer implements DisposableBean {
        private static final Logger logger = LoggerFactory.getLogger(TcpServer.class);
    
        private final SocketServerConfig config;
        private ServerSocket serverSocket;
        private ExecutorService executorService;
        private volatile boolean running = true;
    
        // 存储客户端连接
        private final Map<String, Socket> clientSockets = new ConcurrentHashMap<>();
    
        public TcpServer(SocketServerConfig config) {
            this.config = config;
        }
    
        @PostConstruct
        public void start() throws IOException {
            executorService = Executors.newFixedThreadPool(config.getMaxThreads());
            serverSocket = new ServerSocket(config.getPort());
            logger.info("平台socket服务已启动, 监听端口为 {}", config.getPort());
    
            new Thread(this::acceptConnections).start();
        }
    
        private void acceptConnections() {
            while (running) {
                try {
                    Socket clientSocket = serverSocket.accept();
                    String clientAddress = clientSocket.getInetAddrhttp://www.devze.comess().getHostAddress();
                    clientSockets.put(clientAddress, clientSocket);
                    executorService.execute(new ClientHandler(clientSocket, clientAddress));
                } catch (IOException e) {
                    if (running) {
                        logger.error("Connection accept error", e);
                    }
                }
            }
        }
    
        // 用于发送消息到特定客户端
        public void sendMessageToClient(String clientAddress, String message) throws IOException {
            Socket socket = clientSockets.get(clientAddress);
            if (socket != null && !socket.isClosed()) {
                PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
                out.println(message);
                logger.info("Sent message to {}: {}", clientAddress, message);
            } else {
                logger.warn("Client {} is not connected or socket is closed", clientAddress);
            }
        }
    
        @Override
        public void destroy() throws Exception {
            running = false;
            executorService.shutdown();
            for (Socket socket :编程 clientSockets.values()) {
                if (!socket.isClosed()) {
                    socket.close();
                }
            }
            if (serverSocket != null && !serverSocket.isClosed()) {
                serverSocket.close();
            }
            logger.info("TCP Server stopped");
        }
    
        private class ClientHandler implements Runnable {
            private final Socket clientSocket;
            private final String clientAddress;
    
            ClientHandler(Socket socket, String address) {
                this.clientSocket = socket;
                this.clientAddress = address;
            }
    
            @Override
            public void run() {
                try (BufferedReader in = new BufferedReader(
                        new InputStreamReader(clientSocket.getInputStream()));
                     PrintWriter out = new PrintWriter(
                             clientSocket.getOutputStream(), true)) {
                    logger.info("Client connected: {}", clientAddress);
                    String input;
                    while ((input = in.readLine()) != null) {
                        logger.debug("Received: {}", input);
                        out.println(input);
                        logger.info("Client connected: {}", clientAddress);
                    }
                } catch (IOException e) {
                    logger.warn("Client connection closed: {}", e.getMessage());
                } finally {
                    try {
                        clientSockets.remove(clientAddress);
                        clientSocket.close();
                    } catch (IOException e) {
                        logger.error("Error closing socket", e);
                    }
                }
            }
    
    
        }
    }
    

    配置类

    package com.example.demo2.server.socket;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @ConfigurationProperties(prefix = "socket.server")
    @Data
    public class SocketServerConfig {
        private int port;
        private int maxThreads;
        
        // Getters and Setters
    }
    

    配置文件

    server:
      port: 8080
    
    socket:
      server:
        port: 8088
        maxThreads: 50
    

    向客户端发送测试信息

     @GetMapping("/send")
        public String sendMessage(String clientAddress) throws IOException {
           tcpServer.sendMessageToClient("192.168.3.8","77777777777");
           return "success";
        }
    

    Java实现Socket服务端与客户端双向通信功能

    服务端发送日志

    Java实现Socket服务端与客户端双向通信功能

    客户端

    客户端主体逻辑,使用自己设计的心跳机制,监听服务端状态,如果服务端断开连接,则客户端会尝试重新建立联系。

    package com.example.demo1.socketclient;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Service;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.net.SocketTimeoutException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    @Service
    public class TcpClientService implements ApplicationRunner, DisposableBean {
        private static final Logger logger = LoggerFactory.getLogger(TcpClientServiphpce.class);
    
        private final SocketClientConfig config;
        private Socket socket;
        private PrintWriter out;
        private BufferedReader in;
        private final AtomicBoolean running = new AtomicBoolean(true);
        private final ExecutorService executor = Executors.newSingleThreadExecutor();
        private final MessageListener messageListener;
    
        @Autowired
        public TcpClientService(SocketClientConfig config, MessageListener messageListener) {
            this.config = config;
            this.messageListener = messageListener;
        }
    
        @Override
        public void run(ApplicationArgumejsnts args) throws Exception {
            initializeConnection();
        }
    
        @Override
        public void destroy() throws Exception {
            running.set(false);
            closeResources();
            executor.shutdown();
        }
    
        private synchronized void initializeConnection() {
            new Thread(() -> {
                while (running.get()) {
                    try {
                        socket = new Socket();
                        socket.setKeepAlive(true);
                        socket.setSoTimeout(config.getHeartbeatTimeout());
                        socket.connect(new InetSocketAddress(config.getHost(), config.getPort()), config.getTimeout());
    
                        out = new PrintWriter(socket.getOutputStream(), true);
                        in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    
                        logger.info("Connected to server {}:{}", config.getHost(), config.getPort());
    
                        executor.execute(this::listenToServer);
    
                        startHeartbeat();
    
                        while (!socket.isClosed() && socket.isConnected()) {
                            Thread.sleep(1000);
                        }
                    } catch (Exception e) {
                        logger.warn("Connection error: {}", e.getMessage());
                    } finally {
                        closeResources();
                        if (running.get()) {
                            logger.info("Attempting to reconnect in 5 seconds...");
                            sleepSafely(5000);
                        }
                    }
                }
            }, "tcp-client-connector").start();
        }
    
        private void listenToServer() {
            try {
                String response;
                while (running.get() && !socket.isClosed()) {
                    try {
                        response = in.readLine();
                        if (response == null) {
                            logger.warn("Server closed connection");
                            break; // 终止循环,表示连接已关闭
                        }
                        logger.debug("Received server message: {}", response);
                        messageListener.onMessage(response);
                    } catch (SocketTimeoutException e) {
                        logger.debug("Socket read timeout");
                    } catch (IOException e) {
                        if (!socket.isClosed()) {
                            logger.warn("Connection lost: {}", e.getMessage());
                            break; // 终止循环,表示连接已中断
                        }
                    }
                }
            } finally {
                closeResources(); // 确保资源关闭
            }
        }
    
        private void startHeartbeat() {
            new Thread(() -> {
                while (running.get() && !socket.isClosed()) {
                    try {
                        sendMessageInternal("HEARTBEAT");
                        sleepSafely(config.getHeartbeatInterval());
                    } catch (Exception e) {
                        logger.warn("Heartbeat failed: {}", e.getMessage());
                        break;
                    }
                }
            }, "heartbeat-thread").start();
        }
    
        public synchronized void sendMessage(String message) throws IOException {
            if (socket == null || !socket.isConnected()) {
                throw new IOException("Not connected to server");
            }
            out.println(message);
            logger.debug("Sent message: {}", message);
        }
    
        private synchronized void sendMessageInternal(String message) {
            try {
                if (socket != null && socket.isConnected()) {
                    out.println(message);
                }
            } catch (Exception e) {
                logger.warn("Failed to send heandroidartbeat");
            }
        }
    
        private synchronized void closeResources() {
            try {
                if (out != null) out.close();
                if (in != null) in.close();
                if (socket != null) socket.close();
            } catch (IOException e) {
                logger.warn("Error closing resources: {}", e.getMessage());
            }
        }
    
        private void sleepSafely(long millis) {
            try {
                Thread.sleep(millis);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    
        public interface MessageListener {
            void onMessage(String message);
        }
    }
    

    消息监听:监听服务发送的消息

    package com.example.demo1.socketclient;
    
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    
    @Component
    public class ServerMessageHandler implements TcpClientService.MessageListener {
        private static final Logger logger = LoggerFactory.getLogger(ServerMessageHandler.class);
    
        @Override
        public void onMessage(String message) {
           if(StringUtils.isNotEmpty(message)){
               if (!message.contains("HEARTBEAT")){
                   //处理其他逻辑
                   System.out.println("接收服务端消息成功:"+message);
               }else{
                   //心跳消息
                   System.out.println(message);
               }
           }
        }
    
    
    }
    

    配置类

    package com.example.demo1.socketclient;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    @ConfigurationProperties(prefix = "socket.client")
    @Data
    public class SocketClientConfig {
        private String host;
        private int port;
        private int timeout;
        private int heartbeatInterval;
        private int heartbeatTimeout;
    
        // Getters and Setters
    }
    

    发送测试方法

     @GetMapping("/send")
        public ResponseEntity<String> sendMessage() {
            try {
                tcpClient.sendMessage("客户端发送信息");
                return ResponseEntity.ok("Message sent");
            } catch (IOException e) {
                return ResponseEntity.status(503).body("Server unavailable");
            }
        }
    

    配置文件

    socket:
      client:
        host: 192.168.3.8 #服务端ip地址
        port: 8088 #监听端口
        timeout: 5000
        heartbeat-interval: 3000    # 心跳间隔30秒
        heartbeat-timeout: 6000     # 心跳超时60秒
    server:
      port: 8082
    
    
    

    客户端发送信息后,服务端会接收到信息。

    Java实现Socket服务端与客户端双向通信功能

    总结

    以上就是java接入socket通信服务端与客户端的全部代码,二者实现了互相通信,具体的业务场景则需要小伙伴们在此基础上额外的设计逻辑了。

    以上就是Java实现Socket服务端与客户端双向通信功能的详细内容,更多关于Java Socket服务端与客户端通信的资料请关注编程客栈(www.devze.com)其它相关文章!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜