зеркало из
https://github.com/iharh/notes.git
synced 2025-10-29 12:46:06 +02:00
85 строки
3.2 KiB
Plaintext
85 строки
3.2 KiB
Plaintext
2020
|
|
https://habr.com/ru/post/492312/
|
|
https://github.com/avrylkov/avro-kafka
|
|
|
|
Properties props = new Properties();
|
|
props.put("bootstrap.servers", "localhost:9092");
|
|
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerial-
|
|
izer");
|
|
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerial-
|
|
izer");
|
|
props.put("schema.registry.url", url);
|
|
|
|
|
|
import ...our.avro.SchemaRegistry;
|
|
import org.apache.avro.io.BinaryEncoder;
|
|
import org.apache.avro.io.DatumWriter;
|
|
import org.apache.avro.io.EncoderFactory;
|
|
import org.apache.avro.specific.SpecificDatumWriter;
|
|
import org.apache.kafka.common.errors.SerializationException;
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.IOException;
|
|
import java.nio.ByteBuffer;
|
|
|
|
public class AvroOurDataMessageSerializer implements OurDataMessageSerializer<byte[]> {
|
|
|
|
private static final byte MAGIC_BYTE = 0;
|
|
|
|
private static final int HEADER_SIZE = 5;
|
|
|
|
private final EncoderFactory encoderFactory = EncoderFactory.get();
|
|
|
|
@Override
|
|
public byte[] serialize(OurDataMessage message) {
|
|
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
|
|
DatumWriter<OurDataMessage> writer = new SpecificDatumWriter<>(OurDataMessage.class);
|
|
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
|
|
writer.write(message, encoder);
|
|
encoder.flush();
|
|
byte[] bytes = out.toByteArray();
|
|
ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE + bytes.length);
|
|
buffer.put(MAGIC_BYTE);
|
|
buffer.putInt(SchemaRegistry.CURRENT.version());
|
|
buffer.put(bytes);
|
|
return buffer.array();
|
|
} catch (IOException | RuntimeException e) {
|
|
throw new SerializationException("Error serializing OurDataMessage Avro message", e);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
import org.apache.avro.io.DatumReader;
|
|
import org.apache.avro.io.DecoderFactory;
|
|
import org.apache.avro.specific.SpecificDatumReader;
|
|
|
|
public class AvroOurDataMessageDeserializer implements OurDataMessageDeserializer<byte[]> {
|
|
private static final byte MAGIC_BYTE = 0;
|
|
private static final int HEADER_SIZE = 5;
|
|
|
|
private final DecoderFactory decoderFactory = DecoderFactory.get();
|
|
|
|
@Override
|
|
public OurDataMessage deserialize(byte[] payloadWithHeader) {
|
|
try {
|
|
ByteBuffer buffer = ByteBuffer.wrap(payloadWithHeader);
|
|
int writerSchemaVersion = 1;
|
|
byte[] payload = payloadWithHeader;
|
|
if (hasMagicByte(buffer.get())) {
|
|
writerSchemaVersion = buffer.getInt();
|
|
payload = new byte[payloadWithHeader.length - HEADER_SIZE];
|
|
buffer.get(payload);
|
|
}
|
|
DatumReader<OurDataMessage> reader = new SpecificDatumReader<>(SchemaRegistry.getSchema(writerSchemaVersion), SchemaRegistry.CURRENT.schema());
|
|
return reader.read(null, decoderFactory.binaryDecoder(payload, null));
|
|
} catch (IOException | RuntimeException e) {
|
|
throw new SerializationException("Error deserializing OurDataMessage Avro message to schema [" + OurDataMessage.getClassSchema().toString() + "]", e);
|
|
}
|
|
}
|
|
|
|
private static boolean hasMagicByte(byte magicByte) {
|
|
return magicByte == MAGIC_BYTE;
|
|
}
|
|
}
|