зеркало из
				https://github.com/iharh/notes.git
				synced 2025-10-30 05:06:05 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			85 строки
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			85 строки
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| 2020
 | |
| https://habr.com/ru/post/492312/
 | |
|     https://github.com/avrylkov/avro-kafka
 | |
| 
 | |
| Properties props = new Properties();
 | |
| props.put("bootstrap.servers", "localhost:9092");
 | |
| props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerial-
 | |
| izer");
 | |
| props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerial-
 | |
| izer");
 | |
| props.put("schema.registry.url", url);
 | |
| 
 | |
| 
 | |
| import ...our.avro.SchemaRegistry;
 | |
| import org.apache.avro.io.BinaryEncoder;
 | |
| import org.apache.avro.io.DatumWriter;
 | |
| import org.apache.avro.io.EncoderFactory;
 | |
| import org.apache.avro.specific.SpecificDatumWriter;
 | |
| import org.apache.kafka.common.errors.SerializationException;
 | |
| 
 | |
| import java.io.ByteArrayOutputStream;
 | |
| import java.io.IOException;
 | |
| import java.nio.ByteBuffer;
 | |
| 
 | |
| public class AvroOurDataMessageSerializer implements OurDataMessageSerializer<byte[]> {
 | |
| 
 | |
|     private static final byte MAGIC_BYTE = 0;
 | |
| 
 | |
|     private static final int HEADER_SIZE = 5;
 | |
| 
 | |
|     private final EncoderFactory encoderFactory = EncoderFactory.get();
 | |
| 
 | |
|     @Override
 | |
|     public byte[] serialize(OurDataMessage message) {
 | |
|         try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
 | |
|             DatumWriter<OurDataMessage> writer = new SpecificDatumWriter<>(OurDataMessage.class);
 | |
|             BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
 | |
|             writer.write(message, encoder);
 | |
|             encoder.flush();
 | |
|             byte[] bytes = out.toByteArray();
 | |
|             ByteBuffer buffer = ByteBuffer.allocate(HEADER_SIZE + bytes.length);
 | |
|             buffer.put(MAGIC_BYTE);
 | |
|             buffer.putInt(SchemaRegistry.CURRENT.version());
 | |
|             buffer.put(bytes);
 | |
|             return buffer.array();
 | |
|         } catch (IOException | RuntimeException e) {
 | |
|             throw new SerializationException("Error serializing OurDataMessage Avro message", e);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| 
 | |
| import org.apache.avro.io.DatumReader;
 | |
| import org.apache.avro.io.DecoderFactory;
 | |
| import org.apache.avro.specific.SpecificDatumReader;
 | |
| 
 | |
| public class AvroOurDataMessageDeserializer implements OurDataMessageDeserializer<byte[]> {
 | |
|     private static final byte MAGIC_BYTE = 0;
 | |
|     private static final int HEADER_SIZE = 5;
 | |
| 
 | |
|     private final DecoderFactory decoderFactory = DecoderFactory.get();
 | |
| 
 | |
|     @Override
 | |
|     public OurDataMessage deserialize(byte[] payloadWithHeader) {
 | |
|         try {
 | |
|             ByteBuffer buffer = ByteBuffer.wrap(payloadWithHeader);
 | |
|             int writerSchemaVersion = 1;
 | |
|             byte[] payload = payloadWithHeader;
 | |
|             if (hasMagicByte(buffer.get())) {
 | |
|                 writerSchemaVersion = buffer.getInt();
 | |
|                 payload = new byte[payloadWithHeader.length - HEADER_SIZE];
 | |
|                 buffer.get(payload);
 | |
|             }
 | |
|             DatumReader<OurDataMessage> reader = new SpecificDatumReader<>(SchemaRegistry.getSchema(writerSchemaVersion), SchemaRegistry.CURRENT.schema());
 | |
|             return reader.read(null, decoderFactory.binaryDecoder(payload, null));
 | |
|         } catch (IOException | RuntimeException e) {
 | |
|             throw new SerializationException("Error deserializing OurDataMessage Avro message to schema [" + OurDataMessage.getClassSchema().toString() + "]", e);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     private static boolean hasMagicByte(byte magicByte) {
 | |
|         return magicByte == MAGIC_BYTE;
 | |
|     }
 | |
| }
 | 
