zkClient java.io.StreamCorruptedException

前言

在使用com.101tec.zkclient读取zookeeper节点时出现了这样的错误:

1
java.io.StreamCorruptedException: invalid stream header: 7B0A2020

分析

查看默认的SerializableSerializer代码,具体的代码在org.I0Itec.zkclient.serialize包下。
用idea可以看到decompiled的字节码如下:

1
2
3
4
5
6
7
8
9
10
11
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        try {
            ObjectInputStream inputStream = new TcclAwareObjectIputStream(new ByteArrayInputStream(bytes));
            Object object = inputStream.readObject();
            return object;
        } catch (ClassNotFoundException var4) {
            throw new ZkMarshallingError("Unable to find object class.", var4);
        } catch (IOException var5) {
            throw new ZkMarshallingError(var5);
        }
    }

TcclAwareObjectIputStream继承了ObjectInputStream,new TcclAwareObjectIputStream对象时,调用了super的构造方法。

1
2
3
public TcclAwareObjectIputStream(InputStream in) throws IOException {
        super(in);
}

这个exception是在ObjectInputStream的构造方法中引发的,查看源码可以得知是readStreamHeader方法中有校验传入的stream头,这是java对象序列化后为流所特有的标记。只要传进来的流header校验不同过,在构造方法里就会触发异常。

1
2
3
4
5
6
7
8
9
protected void readStreamHeader()
    throws IOException, StreamCorruptedException
{
    short s0 = bin.readShort();
    short s1 = bin.readShort();
    if (s0 != STREAM_MAGIC || s1 != STREAM_VERSION) {
        throw new StreamCorruptedException(
            String.format("invalid stream header: %04X%04X", s0, s1));
}

然而我们存储在zookeeper节点上的数据并不是java对象,因此zkClient将获取到的bytes数据传入ObjectInputStream中后,就必然引发了异常。
其实deserialize方法传入的byte数组中的数据就是zookeeper节点上的数据了,我们只需要将其转为对应编码的String对象即可。同理,序列化的话,只需要将String对象根据编码转为对应的byte数组。
通过自定义实现ZkSerializer指定编码即可解决,自定义实现的Utf8ZkSerializer如下:

1
2
3
4
5
6
7
8
9
10
11
12
class Utf8ZkSerializer implements ZkSerializer {
        @Override
        public byte[] serialize(Object data) throws ZkMarshallingError {
            return String.valueOf(data).getBytes(StandardCharsets.UTF_8);
        }


        @Override
        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            return new String(bytes, StandardCharsets.UTF_8);
        }
    }

在创建完zkClient后使用如下方式设置ZkSerializer

1
2
zkClient.setZkSerializer(new Utf8ZkSerializer());

这样设置后,序列化与反序列化zk节点上的数据用的就是我们自定义的ZkSerializer了,即可正常存取zk节点上的内容。

作者

ZhongHuihong

发布于

2021-08-19

更新于

2021-10-04

许可协议