notes/messaging/kafka/avro-serialize.txt
Ihar Hancharenka d82bbe85b2 m
2025-08-01 10:17:49 +03:00

85 строки
3.2 KiB
Plaintext

2020
https://habr.com/ru/post/492312/
https://github.com/avrylkov/avro-kafka
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerial-
izer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerial-
izer");
props.put("schema.registry.url", url);
import ...our.avro.SchemaRegistry;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.common.errors.SerializationException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
public class AvroOurDataMessageSerializer implements OurDataMessageSerializer<byte[]> {
private static final byte MAGIC_BYTE = 0;
private static final int HEADER_SIZE = 5;
private final EncoderFactory encoderFactory = EncoderFactory.get();
@Override
public byte[] serialize(OurDataMessage message) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
DatumWriter<OurDataMessage> writer = new SpecificDatumWriter<>(OurDataMessage.class);
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
writer.write(message, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE + bytes.length);
buffer.put(MAGIC_BYTE);
buffer.putInt(SchemaRegistry.CURRENT.version());
buffer.put(bytes);
return buffer.array();
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing OurDataMessage Avro message", e);
}
}
}
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
public class AvroOurDataMessageDeserializer implements OurDataMessageDeserializer<byte[]> {
private static final byte MAGIC_BYTE = 0;
private static final int HEADER_SIZE = 5;
private final DecoderFactory decoderFactory = DecoderFactory.get();
@Override
public OurDataMessage deserialize(byte[] payloadWithHeader) {
try {
ByteBuffer buffer = ByteBuffer.wrap(payloadWithHeader);
int writerSchemaVersion = 1;
byte[] payload = payloadWithHeader;
if (hasMagicByte(buffer.get())) {
writerSchemaVersion = buffer.getInt();
payload = new byte[payloadWithHeader.length - HEADER_SIZE];
buffer.get(payload);
}
DatumReader<OurDataMessage> reader = new SpecificDatumReader<>(SchemaRegistry.getSchema(writerSchemaVersion), SchemaRegistry.CURRENT.schema());
return reader.read(null, decoderFactory.binaryDecoder(payload, null));
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing OurDataMessage Avro message to schema [" + OurDataMessage.getClassSchema().toString() + "]", e);
}
}
private static boolean hasMagicByte(byte magicByte) {
return magicByte == MAGIC_BYTE;
}
}