Serialization / Deserialization
Serializer and deserializer are defined under com.anymindgroup.pubsub.serde
.
A Deserializer
basically represents a function that takes a message with array of bytes as input and returns type T
by using environment R
.
Other meta information under ReceivedMessage
can be included into building type T
apart from the message body.
Defined as follows:
trait Deserializer[-R, +T] {
def deserialize(message: ReceivedMessage[Array[Byte]]): RIO[R, T]
}
Serializer
works the other way around, given input of type T
, return an array of bytes using environmet R
:
trait Serializer[-R, -T] {
def serialize(data: T): RIO[R, Array[Byte]]
}
Serde
is a combination of both:
trait Serde[-R, T] extends Serializer[R, T] with Deserializer[R, T] {}
The object com.anymindgroup.Serde
contains built-in serializers/deserializers.
Currently available:
Serde.byteArray
Serde.int
Serde.utf8String
Serializer/Deserializer for custom data types
Example of creating a serializer / deserializer for a data type from/to a JSON string using zio-json:
import com.anymindgroup.pubsub.*, zio.*, zio.json.*
// given data structure with a json codec like
case class MyData(name: String, age: Int) derives JsonCodec
// a deserializer implementation can look like
val myDataDes: Deserializer[Any, MyData] =
message =>
String(message.data).fromJson[MyData] match
case Left(err) => ZIO.fail(Throwable(s"Failed to deserialize: $err"))
case Right(value) => ZIO.succeed(value)
// and a serializer like
val myDataSer: Serializer[Any, MyData] = data => ZIO.succeed(data.toJson.getBytes())
Handling deserialization errors
Example for handling deserialization errors in a subsription process:
import com.anymindgroup.pubsub.*, zio.*, zio.json.*
case class MyData(name: String, age: Int) derives JsonCodec
// deserializer returning the result as Either instead of failing
val myDataDes: Deserializer[Any, Either[String, MyData]] = message =>
ZIO.succeed(String(message.data).fromJson[MyData])
// result can be handled in the subscription process e.g. like
val subStream =
Subscriber.subscribe("my_sub_name", myDataDes).mapZIO { (message, reply) =>
message.data match
case Left(err) => reply.nack() *> ZIO.logError(s"Failed to deserialize: $err")
case Right(data) => reply.ack() *> ZIO.logInfo(s"Data ok $data")
}