2020 https://habr.com/ru/post/492312/ https://github.com/avrylkov/avro-kafka Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerial- izer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerial- izer"); props.put("schema.registry.url", url); import ...our.avro.SchemaRegistry; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.EncoderFactory; import org.apache.avro.specific.SpecificDatumWriter; import org.apache.kafka.common.errors.SerializationException; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; public class AvroOurDataMessageSerializer implements OurDataMessageSerializer { private static final byte MAGIC_BYTE = 0; private static final int HEADER_SIZE = 5; private final EncoderFactory encoderFactory = EncoderFactory.get(); @Override public byte[] serialize(OurDataMessage message) { try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { DatumWriter writer = new SpecificDatumWriter<>(OurDataMessage.class); BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); writer.write(message, encoder); encoder.flush(); byte[] bytes = out.toByteArray(); ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE + bytes.length); buffer.put(MAGIC_BYTE); buffer.putInt(SchemaRegistry.CURRENT.version()); buffer.put(bytes); return buffer.array(); } catch (IOException | RuntimeException e) { throw new SerializationException("Error serializing OurDataMessage Avro message", e); } } } import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; public class AvroOurDataMessageDeserializer implements OurDataMessageDeserializer { private static final byte MAGIC_BYTE = 0; private static final int HEADER_SIZE = 5; private final DecoderFactory decoderFactory = DecoderFactory.get(); @Override public OurDataMessage deserialize(byte[] payloadWithHeader) { try { ByteBuffer buffer = ByteBuffer.wrap(payloadWithHeader); int writerSchemaVersion = 1; byte[] payload = payloadWithHeader; if (hasMagicByte(buffer.get())) { writerSchemaVersion = buffer.getInt(); payload = new byte[payloadWithHeader.length - HEADER_SIZE]; buffer.get(payload); } DatumReader reader = new SpecificDatumReader<>(SchemaRegistry.getSchema(writerSchemaVersion), SchemaRegistry.CURRENT.schema()); return reader.read(null, decoderFactory.binaryDecoder(payload, null)); } catch (IOException | RuntimeException e) { throw new SerializationException("Error deserializing OurDataMessage Avro message to schema [" + OurDataMessage.getClassSchema().toString() + "]", e); } } private static boolean hasMagicByte(byte magicByte) { return magicByte == MAGIC_BYTE; } }