Avro Schema Evolution
I’m involved in an ambitious project to split a monolith into a service-oriented architecture connected via a stream of events using Kafka topics. Kafka is agnostic to the data serialization format used, and I’ve been looking at Avro in particular.
Avro – like Protocol Buffers and Thrift – is a binary format, making it more space-efficient than a text and verbose format like JSON. It stands out in that it’s schema support logical and complex types, and it’s decoupling between the writer’s schema and the reader’s schema, which provides flexibility.
The schema for an application’s data is expected to change over time. In database-backed applications, this is typically done by changing the data shape with a migration. I’ve written about how deploying schema migrations needs to be though of carefully. In that article I covered a few different strategies, but they all shared a common trait: All the data has one shape before the migrations, and a different one afterwards. That is incompatible with applications that rely on an immutable log. Let’s explore Avro’s schema evolution.
An Example Schema
We’ll start our example with an Avro schema for a CatalogItem
, an example entity that could be used in an e-commerce application. The schema is defined in JSON:
{
"type": "record",
"name": "CatalogItem",
"fields": [{
"name": "id",
"type": "long"
},
{
"name": "description",
"type": "string"
},
{
"name": "price",
"type": "bytes",
"logicalType": "decimal",
"precision": "30",
"scale": "12"
}
]
}
A robust application will have some sort of schema registry. For the purpose of this example, I will hide that complexity and implement the simplest thing that works:
require 'avro'
module SchemaRegistry
V1 = Avro::Schema.parse(V1_JSON)
end
Avro’s Ruby library is very verbose. Let’s abstract away some of the boilerplate:
module AvroHelper
def self.serialize(writers_schema, datum)
target = StringIO.new
writer = Avro::IO::DatumWriter.new(writers_schema)
encoder = Avro::IO::BinaryEncoder.new(target)
writer.write(datum, encoder)
target.string
end
def self.deserialize(writers_schema, readers_schema, encoded_data)
source = StringIO.new(encoded_data)
reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
decoder = Avro::IO::BinaryDecoder.new(source)
reader.read(decoder)
end
end
Serializing and De-serializing Messages
The basics of Avro in action:
message_1 = {
"id" => 1,
"description" => "iPhone SE",
"price" => "399.99"
}
Avro::Schema.validate(SchemaRegistry::V1, message_1)
# => true
encoded_message_1 = AvroHelper.serialize(SchemaRegistry::V1, message_1)
# => "\u0002\u0012iPhone SE\f399.99"
AvroHelper.deserialize(
SchemaRegistry::V1,
SchemaRegistry::V1,
encoded_message_1
)
# => {"id"=>1, "description"=>"iPhone SE", "price"=>"399.99"}
As we can see, we can roundtrip data to and from Avro. When de-serializing, both the writer’s and reader’s schema need to be provided, even id they are the same, as in the example above.
Message Evolution
Our fictional e-commerce application is ready to take over the world. We need support for multi-currency in our application. Let’s add a currency field for V2
of our CatalogItem
{
"type": "record",
"name": "CatalogItem",
"fields": [{
"name": "id",
"type": "long"
},
{
"name": "description",
"type": "string"
},
{
"name": "price",
"type": "bytes",
"logicalType": "decimal",
"precision": "30",
"scale": "12"
},
{
"name": "currency",
"type": "string",
"default": "USD"
}
]
}
module SchemaRegistry
V2 = Avro::Schema.parse(V2_JSON)
end
We’ve added an currency
field with a default value. This gives us backward-compatibility. We can continue reading existing messages with the new schema.
AvroHelper.deserialize(
SchemaRegistry::V1,
SchemaRegistry::V2,
encoded_message_1
)
# => {"id"=>1,
# "description"=>"iPhone SE",
# "price"=>"399.99",
# "currency"=>"USD"}
Avro::SchemaCompatibility.can_read?(SchemaRegistry::V1, SchemaRegistry::V2)
# => true
The original message_1
did not include a currency
field, but when converting to a V2
schema the default kicks. What about the other way around?
message_2 = {
"id" => 2,
"description" => "Pixel 3",
"price" => "279.00",
"currency" => "MXN"
}
encoded_message_2 = AvroHelper.serialize(SchemaRegistry::V2, message_2)
# => "\u0004\u000EPixel 3\f279.00\u0006MXN"
AvroHelper.deserialize(
SchemaRegistry::V2,
SchemaRegistry::V1,
encoded_message_2
)
# => {"id"=>2, "description"=>"Pixel 3", "price"=>"279.00"}
Avro::SchemaCompatibility.mutual_read?(SchemaRegistry::V1, SchemaRegistry::V2)
# => true
Avro considers this change to be forward-compatible: The messages written by a later schema are readable by a previous schema. Since currency
is an optional field, Avro’s resolution rules dictate that it can be ignored when reading in V1
schema. message_2
was written with a currency
value of MXN
, but read without one. More than likely, our business logic would assume that the currency would be USD
. This is a potential source of bugs, and up to the developer to correctly handle this type of situation.
Forging ahead, let us now consider that we want to make the currency
field a required one. Our V3
schema is now:
{
"type": "record",
"name": "CatalogItem",
"fields": [{
"name": "id",
"type": "long"
},
{
"name": "description",
"type": "string"
},
{
"name": "price",
"type": "bytes",
"logicalType": "decimal",
"precision": "30",
"scale": "12"
},
{
"name": "currency",
"type": "string"
}
]
}
module SchemaRegistry
V3 = Avro::Schema.parse(V3_JSON)
end
Is V3
backwards-compatible with previous schemas?
Avro::SchemaCompatibility.can_read?(SchemaRegistry::V1, SchemaRegistry::V3)
# => false
AvroHelper.deserialize(
SchemaRegistry::V1,
SchemaRegistry::V3,
encoded_message_1
) rescue $!
# => #<Avro::AvroError: Missing data for "string" with no default>
Avro::SchemaCompatibility.can_read?(SchemaRegistry::V2, SchemaRegistry::V3)
# => true
AvroHelper.deserialize(
SchemaRegistry::V2,
SchemaRegistry::V3,
encoded_message_2
)
# => {"id"=>2, "description"=>"Pixel 3", "price"=>"279.00", "currency"=>"MXN"}
V3
is backward-compatible with V2
, but not V1
. The currency
field is expected, but it’s not there.
Evolving Topics
A path of reconciling backwards-compatible changes, is to evolve the messages in a topic and write them to a new topic. This is known as Copy and Replace. Let’s model a topic by using an array of objects that contain messages accompanied by the writer’s schema:
v1_events = [
{
writers_schema: 'SchemaRegistry::V1',
message: encoded_message_1
},
{
writers_schema: 'SchemaRegistry::V2',
message: encoded_message_2
}
]
v1_events.map { |wrapper|
AvroHelper.deserialize(
Object.const_get(wrapper[:writers_schema]),
SchemaRegistry::V2,
wrapper[:message]
)
}
# => [{"id"=>1,
# "description"=>"iPhone SE",
# "price"=>"399.99",
# "currency"=>"USD"},
# {"id"=>2,
# "description"=>"Pixel 3",
# "price"=>"279.00",
# "currency"=>"MXN"}]
As expected, the topic is backwards-compatible with V1
. We know that V3
is backward-compatible with V2
, so we can create a new topic and evolve all the events:
v2_events = v1_events.map { |wrapper|
AvroHelper.deserialize(
Object.const_get(wrapper[:writers_schema]),
SchemaRegistry::V2,
wrapper[:message]
)
}.map { |datum|
{
writers_schema: 'SchemaRegistry::V2',
message: AvroHelper.serialize(SchemaRegistry::V2, datum)
}
}
# => [{:writers_schema=>"SchemaRegistry::V2",
# :message=>"\u0002\u0012iPhone SE\f399.99\u0006USD"},
# {:writers_schema=>"SchemaRegistry::V2",
# :message=>"\u0004\u000EPixel 3\f279.00\u0006MXN"}]
As expected, the topic can be read correctly in V3
:
v2_events.map { |wrapper|
AvroHelper.deserialize(
Object.const_get(wrapper[:writers_schema]),
SchemaRegistry::V3,
wrapper[:message]
)
}
# => [{"id"=>1,
# "description"=>"iPhone SE",
# "price"=>"399.99",
# "currency"=>"USD"},
# {"id"=>2,
# "description"=>"Pixel 3",
# "price"=>"279.00",
# "currency"=>"MXN"}]
In fact, because we “hydrated” the default currency values from V2
, we can evolve all the messages to V3
. Consumers in V2
will continue to be able to read the values (V3
is forward-compatible), because we ensured that default values were written to all messages.
v3_events = v1_events.map { |wrapper|
AvroHelper.deserialize(
Object.const_get(wrapper[:writers_schema]),
SchemaRegistry::V2,
wrapper[:message]
)
}.map { |datum|
{
writers_schema: 'SchemaRegistry::V3',
message: AvroHelper.serialize(SchemaRegistry::V3, datum)
}
}
v3_events.map { |wrapper|
AvroHelper.deserialize(
Object.const_get(wrapper[:writers_schema]),
SchemaRegistry::V2,
wrapper[:message]
)
}
# => [{"id"=>1,
# "description"=>"iPhone SE",
# "price"=>"399.99",
# "currency"=>"USD"},
# {"id"=>2,
# "description"=>"Pixel 3",
# "price"=>"279.00",
# "currency"=>"MXN"}]
Interestingly, because of how Avro serializes to binary, supplied optional fields and required fields are serialized in exactly the same way. The transformation from the hydrated V2
to V3
is a no-op:
v2_events == v3_events
# => false
Evolving Topics Without “Copy and Replace”
“Copy and Replace” poses operational concerns: How do we coordinate the copying of all events to a new topic and the writing of new incoming events, without sacrificing the order? In the previous example v1_events
was evolved into v2_events
(and the identical v3_events
) because we knew that the each Avro schema was backward-compatible. We can encode that knowledge into our registry:
module SchemaRegistry
EVOLUTION = [
'SchemaRegistry::V1',
'SchemaRegistry::V2',
'SchemaRegistry::V3'
]
end
With that in place, we can evolve each message to the latest version at read-time:
module AvroHelper
# This code works, but is not optimized. It serializes and de-serializes the
# messages more than strictly necessary.
def self.evolve_to_latest(evolution, writers_schema:, message:)
(evolution.index(writers_schema)..(evolution.size - 1)).reduce(message) { |encoded, schema_index|
current_schema = Object.const_get(evolution[schema_index])
next_schema = Object.const_get(evolution[schema_index + 1] || evolution.last)
serialize(next_schema, deserialize(current_schema, next_schema, encoded))
}.then { |encoded|
latest_schema = Object.const_get(evolution.last)
deserialize(latest_schema, latest_schema, encoded)
}
end
end
message_3 = {
"id" => 3,
"description" => "Galaxy S20",
"price" => "800.00",
"currency" => "EUR"
}
v1_events << {
writers_schema: 'SchemaRegistry::V3',
message: AvroHelper.serialize(SchemaRegistry::V3, message_3)
}
v1_events.map { |wrapper|
AvroHelper.evolve_to_latest(SchemaRegistry::EVOLUTION, **wrapper)
}
# => [{"id"=>1,
# "description"=>"iPhone SE",
# "price"=>"399.99",
# "currency"=>"USD"},
# {"id"=>2,
# "description"=>"Pixel 3",
# "price"=>"279.00",
# "currency"=>"MXN"},
# {"id"=>3,
# "description"=>"Galaxy S20",
# "price"=>"800.00",
# "currency"=>"EUR"}]
Conclusions
Avro makes a distinction between the schema used to write a message and the schema used to read it. Its resolution rules dictate the behavior. The separation of schema and serialized message implies that care must be taken that metadata about the serialized message be carried with it, so that the writer’s schema can be resolved.
Avro schema evolution doesn’t not solve all compatibility issues: The business rules for an acceptable schema evolution might be different than what Avro considers compatible, as illustrated by the V1
forward compatibility with V2
messages.
In database-backed applications, schema changes often require modifying existing records (e.g. “back-fills”). In message-broker-backed applications, it’s typical to have an immutable log of events. Forcing all consumers to be able to deal with every version of the event schema is burdensome, and error prone. A way to reconcile that is by using “Copy and Replace” to populate new, equivalent topics. Alternatively, we can design schema versions that are backward-compatible with consecutive version. We can encode the version evolution in a registry. Then evolve each message at [read-time]1 as necessary. This approach results in a reduction of the numbers of topics that need to be created, and the complexity of managing cut-offs.
-
In fact, newer versions of Postgres support a kind of read-time evolutions: When altering a table and adding a default value, Postgres no longer rewrites every entry in the table. Instead, at read-time if the value is not written in the row, it will look-up the default value. ↩
Find me on Mastodon at @ylansegal@mastodon.sdf.org,
or by email at ylan@{this top domain}
.