天天看点

Protobuf Schema动态解析初衷准备解析测试

初衷

Protobuf是Google出品的一款很高效的序列化和反序列化库,但是也有些小缺点:就是需要先定义好数据结构(.proto),然后编译为对应的java文件;

我的需求是这样的:由另一方给我提供数据结构和序列化之后的数据,我根据数据结构动态解析成对应的Schema,然后反序列化数据后根据Schema获取对应的值。

准备

<dependency>
    <groupId>io.protostuff</groupId>
    <artifactId>protostuff-parser</artifactId>
    <version>2.0.0-alpha32</version>
</dependency>
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.5.1</version>
</dependency>           

数据结构

syntax = "proto3";

 enum Type {
     TRANSACTION = 0;
     EVENT = 1;
 }

 message MyMessage {
     int32 id = 1;
     Type type = 2;
     int64 timestamp = 3;
     map<string,string> tags = 4;
     repeated MyMessage children = 5;
 }           

解析

/**
     * 解析proto
     * @param messageId Message的ID
     * @param input proto文件内容
     * @return 对应的schema
     * @throws Exception
     */
    public DynamicSchema parseProto(long messageId, String input) throws Exception {
        DynamicSchema schema = null;
        if (input != null) {
            String protoFile = UUID.randomUUID() + ".proto";
            Path tempDirectory = Files.createTempDirectory(PB_TEMP_DIRECTORY);
            Path file = Files.write(tempDirectory.resolve(protoFile), input.getBytes());

            List<String> msgTypeNames = parseAndGetMsgType(input);
            try {
                DynamicSchema.Builder schemaBuilder = DynamicSchema.newBuilder();
                schemaBuilder.setName(SCHEMA_PREFIX + messageId);
                if (msgTypeNames != null) {
                    for (String msgType : msgTypeNames) {
                        schemaBuilder.addMessageDefinition(defineMessage(tempDirectory, protoFile, msgType));
                    }
                    msgTypeOfChannel.put(messageId, msgTypeNames);
                }
                schema = schemaBuilder.build();
            } finally {
                Files.delete(file);
                Files.delete(tempDirectory);
            }
        }
        return schema;
    }

    /**
     * 根据schema获取对应的数据结构(Message)
     * @param schema
     * @param messageId
     * @return
     * @throws Exception
     */
    public Map<String, Map<String, Object>> parseSchema(DynamicSchema schema, long messageId) throws Exception {
        Map<String, Map<String, Object>> fieldMap = new LinkedHashMap<>();
        List<String> msgTypeNames = msgTypeOfChannel.get(messageId);
        if (msgTypeNames != null) {
            for (String msgTypeName : msgTypeNames) {
                Descriptors.Descriptor descriptor = schema.getMessageDescriptor(msgTypeName);
                Map<String, Object> fm = getFieldMap(descriptor);
                if (fm != null) {
                    fieldMap.put(msgTypeName, fm);
                }
                List<Descriptors.Descriptor> nestedDescriptors = descriptor.getNestedTypes();
                if (fm != null) {
                    for (Descriptors.Descriptor ds : nestedDescriptors) {
                        //map
                        fm.put(ds.getName(), getFieldType("map", null));
                    }
                }
            }
        }
        return fieldMap;
    }           

测试

使用上面的方法进行解析之后的Schema如下:

types: [MyMessage, MyMessage.tags]
enums: [MyMessage.Type]
file {
  name: "pb_schema_1"
  message_type {
    name: "MyMessage"
    field {
      name: "id"
      number: 1
      label: LABEL_OPTIONAL
      type: TYPE_INT32
    }
    field {
      name: "timestamp"
      number: 3
      label: LABEL_OPTIONAL
      type: TYPE_INT64
    }
    field {
      name: "children"
      number: 5
      label: LABEL_REPEATED
      type_name: "MyMessage"
    }
    nested_type {
      name: "tags"
      field {
        name: "key"
        number: 1
        label: LABEL_OPTIONAL
        type: TYPE_STRING
      }
      field {
        name: "value"
        number: 2
        label: LABEL_OPTIONAL
        type: TYPE_STRING
      }
    }
    enum_type {
      name: "Type"
      value {
        name: "TRANSACTION"
        number: 0
      }
      value {
        name: "EVENT"
        number: 1
      }
    }
  }
}           

相关代码:

https://github.com/shuaiweili/ser-des-example/blob/master/src/test/java/com/git/lee/serde/example/pb/DynamicSchemaTest.java

继续阅读