使用Java实现Redis客户端

2022/11/11 Redis

# Redis通信协议-RESP协议

Redis是一个CS架构的软件,通信一般分两步(不包括pipeline和PubSub):

客户端(client)向服务端(server)发送一条命令

服务端解析并执行命令,返回响应结果给客户端

因此客户端发送命令的格式、服务端响应结果的格式必须有一个规范,这个规范就是通信协议。

而在Redis中采用的是RESP(Redis Serialization Protocol)协议:

  • Redis 1.2版本引入了RESP协议
  • Redis 2.0版本中成为与Redis服务端通信的标准,称为RESP2
  • Redis 6.0版本中,从RESP2升级到了RESP3协议,增加了更多数据类型并且支持6.0的新特性--客户端缓存

但目前,默认使用的依然是RESP2协议,也是我们要学习的协议版本(以下简称RESP)。

在RESP中,通过首字节的字符来区分不同数据类型,常用的数据类型包括5种:

  • 单行字符串:首字节是 ‘+’ ,后面跟上单行字符串,以CRLF( "\r\n" )结尾。例如返回"OK": "+OK\r\n"
  • 错误(Errors):首字节是 ‘-’ ,与单行字符串格式一样,只是字符串是异常信息,例如:"-Error message\r\n"
  • 数值:首字节是 ‘:’ ,后面跟上数字格式的字符串,以CRLF结尾。例如:":10\r\n"
  • 多行字符串:首字节是 ‘$’ ,表示二进制安全的字符串,最大支持512MB:
    • 如果大小为0,则代表空字符串:"$0\r\n\r\n"
    • 如果大小为-1,则代表不存在:"$-1\r\n"
  • 数组:首字节是 ‘*’,后面跟上数组元素个数,再跟上元素,元素数据类型不限

Redis支持TCP通信,因此我们可以使用Socket来模拟客户端,与Redis服务端建立连接:

public class RedisClient {
    /** Redis中 RESP 协议头 */
    private final static char SIMPLE_STRING = '+';
    private final static char ERRORS = '-';
    private final static char NUMBER = ':';
    private final static char MULTI_STRING = '$';
    private final static char ARRAY = '*';
    private final static String NEW_LINEAR = "\r\n";
	/** Redis服务端连接信息 */
    private final static String CONN_HOST = "localhost";
    private final static Integer CONN_PORT = 6379;
    
    private Socket socket;
    private InputStream is;
    private OutputStream os;

    public RedisClient() {
        try {
            // 使用Socket进行远程连接
            socket = new Socket(CONN_HOST, CONN_PORT);
            is = socket.getInputStream();
            os = socket.getOutputStream();
        } catch (IOException e) {
            e.printStackTrace();
            close();
        }
    }
	
    public void sendRequest(String msg) throws IOException {
        // 解析命令
        String[] commands = parseCommand(msg);
        // 进行命令的拼接写出
        StringBuilder sb = new StringBuilder();
        // *3\r\n 命令数组头
        sb.append(ARRAY).append(commands.length).append(NEW_LINEAR);
        for (String command : commands) {
            // $4\r\nname\r\n
            sb.append(MULTI_STRING).append(command.length()).append(NEW_LINEAR).append(command).append(NEW_LINEAR);
        }
        os.write(sb.toString().getBytes(StandardCharsets.UTF_8));
        os.flush();
    }

    private String[] parseCommand(String commandStr) {
        return commandStr.split(" ");
    }

    public Object handlerResponse() throws IOException {
        int read = is.read();
        switch (read) {
            case SIMPLE_STRING:
            case ERRORS:
                return new BufferedReader(new InputStreamReader(is)).readLine();
            case NUMBER:
                return readNumber();
            case MULTI_STRING:
                return readMulti();
            case ARRAY:
                return readArrays();
            default:
                throw new RuntimeException("结果有误");
        }
    }

    private Object readMulti() throws IOException {
        int count = readNumber().intValue();
        if (count == -1) {
            return null;
        }
        if (count == 0) {
            return "";
        }
        return readLine(count);
    }

    private Long readNumber() throws IOException {
        byte[] bytes = new byte[1024];
        int temp;
        int count = 0;
        while ((temp = is.read()) != '\n') {
            if (temp != '\r') {
                bytes[count++] = (byte) temp;
            }
        }
        return Long.parseLong(new String(bytes, 0, count));
    }

    private Object readArrays() throws IOException {
        int count = readNumber().intValue();
        List<Object> data = new ArrayList<>(count);
        for (int i = 0; i < count; i++) {
            data.add(handlerResponse());
        }
        return data;
    }

    private String readLine(int count) throws IOException {
        byte[] bytes = new byte[1024];
        for (int i = 0; i < count + NEW_LINEAR.length(); i++) {
            int temp = is.read();
            if (temp != '\r' && temp != '\n') {
                bytes[i] = (byte) temp;
            }
        }
        return new String(bytes, 0, count, StandardCharsets.UTF_8);
    }

    public void close() {
        try {
            if (socket != null) {
                socket.close();
            }
            if (is != null) {
                is.close();
            }
            if (os != null) {
                os.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        RedisClient redisClient = new RedisClient();
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        while (true) {
            System.out.print("[INFO] 请输入redis命令>>> ");
            String command = reader.readLine();
            if (command.equalsIgnoreCase("exit")) {
                redisClient.close();
                break;
            }
            redisClient.sendRequest(command);
            System.out.println(redisClient.handlerResponse());
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138