Getting Started with ZIO Google Cloud Pub/Sub
Google Cloud Pub/Sub client providing stream-based, declarative, high-level API with zio and zio-streams to help to concentrate on the business logic.
Released for Scala 3 targeting JVM and Native via scala-native with exception of some modules due to Java dependencies.
Scala.js support could be potentially added.
Scala 2.13 release will be kept in v0.2.x
release series in theseries/0.2.x
branch, but not officially maintained. If you still rely on 2.13 release and require updates you may raise PRs targeting the series/0.2.x
branch or create a fork.
Modules
Name | Description | JVM | Native |
---|---|---|---|
zio-pubsub | Core components/interfaces/models | ✅ | ✅ |
zio-pubsub-http | Implementation using Pub/Sub REST API based on clients from AnyMindGroup/zio-gcp | ✅ | ✅ |
zio-pubsub-google | Provides gRPC based client implementations via Google's Java library by using the StreamingPull API for subscriptions. | ✅ | ❌ |
zio-pubsub-serde-zio-schema | Provides Serializer/Deserializer using the zio-schema binary codec | ✅ | ✅ |
Getting Started
To get started with sbt, add the following line to your build.sbt file:
libraryDependencies += "com.anymindgroup" %% "zio-pubsub-http" % "0.3.1"
// or for Google's Java client based implementation:
// libraryDependencies += "com.anymindgroup" %% "zio-pubsub-google" % "0.3.1"
// Both can also be used interchangeably
Usage examples
Create a stream for existing subscription:
import com.anymindgroup.pubsub.*, zio.*, zio.stream.ZStream
object BasicSubscription extends ZIOAppDefault:
def run =
// create a subscription stream based on Subscriber implementation provided
def subStream(s: Subscriber): ZStream[Any, Throwable, Unit] =
s.subscribe(
subscriptionName = SubscriptionName("gcp_project", "subscription"),
deserializer = Serde.utf8String,
).mapZIO: (message, ackReply) =>
for
_ <- ZIO.logInfo(
s"Received message" +
s" with id ${message.messageId.value}" +
s" and data ${message.data}"
)
_ <- ackReply.ack()
yield ()
val makeSubscriber: RIO[Scope, Subscriber] =
// make http based Subscriber implementation
http.makeSubscriber(
// set by default to "PubsubConnectionConfig.Cloud" when not running against an emulator
connection = PubsubConnectionConfig.Emulator("localhost", 8085)
)
// or similarly by using gRCP/StreamingPull API based implementation via Google's Java client:
// google.makeStreamingPullSubscriber(
// connection = PubsubConnectionConfig.Emulator("localhost", 8085)
// )
makeSubscriber.flatMap(subStream(_).runDrain)
Publish random string every 2 seconds
import com.anymindgroup.pubsub.*, zio.stream.*, zio.*, zio.ZIO.*
object SamplesPublisher extends ZIOAppDefault:
def run =
// run samples publishing given Publisher implementation
def samplesPublish(p: Publisher[Any, String]) =
ZStream
.repeatZIOWithSchedule(Random.nextInt.map(i => s"some data $i"), Schedule.fixed(2.seconds))
.mapZIO { sample =>
for {
mId <- p.publish(
PublishMessage(
data = sample,
attributes = Map.empty,
orderingKey = None,
)
)
_ <- logInfo(s"Published data $sample with message id ${mId.value}")
} yield ()
}
.runDrain
val makePublisher: RIO[Scope, Publisher[Any, String]] =
// make http based topic publisher
http.makeTopicPublisher(
topicName = TopicName("gcp_project", "topic"),
serializer = Serde.utf8String,
// set by default to "PubsubConnectionConfig.Cloud" when not running against an emulator
connection = PubsubConnectionConfig.Emulator("localhost", 8085),
)
// or similarly by using gRCP based implementation via Google's Java client:
// google.makeTopicPublisher(
// topicName = TopicName("gcp_project", "topic"),
// serializer = Serde.utf8String,
// connection = PubsubConnectionConfig.Emulator("localhost", 8085),
// )
makePublisher.flatMap(samplesPublish)
Setup topics and subscription with dead letter settings using the admin api:
import com.anymindgroup.gcp.pubsub.v1.*
import com.anymindgroup.pubsub.*, http.*
import zio.*
object ExamplesAdminSetup extends ZIOAppDefault:
// topics
val exampleTopic = TopicName("gcp_project", "topic")
val exampleDeadLettersTopic = exampleTopic.copy(topic = s"${exampleTopic.topic}__dead_letters")
// subscriptions
val subName = SubscriptionName(exampleTopic.projectId, "subscription")
val exampleSub: Subscription = Subscription(
topicName = exampleTopic,
name = subName,
filter = None,
enableOrdering = false,
expiration = None,
deadLettersSettings = Some(DeadLettersSettings(exampleDeadLettersTopic, 5)),
)
val exampleDeadLettersSub: Subscription = exampleSub.copy(
topicName = exampleDeadLettersTopic,
name = subName.copy(subscription = s"${subName.subscription}__dead_letters"),
deadLettersSettings = None,
)
def run =
makeAuthedBackend(
// set by default to "PubsubConnectionConfig.Cloud" when not running against an emulator
connection = PubsubConnectionConfig.Emulator(host = "localhost", port = 8085)
).flatMap: backend =>
for
_ <- ZIO.foreach(List(exampleTopic, exampleDeadLettersTopic)): topic =>
resources.projects.Topics
.create(
projectsId = topic.projectId,
topicsId = topic.topic,
request = schemas.Topic(name = topic.fullName),
)
.send(backend)
_ <- ZIO.foreach(List(exampleSub, exampleDeadLettersSub)): subcription =>
resources.projects.Subscriptions
.create(
projectsId = subcription.name.projectId,
subscriptionsId = subcription.name.subscription,
request = schemas.Subscription(
name = subcription.name.fullName,
topic = subcription.topicName.fullName,
),
)
.send(backend)
yield ()
To run the example start Google Pub/Sub emulator with docker-compose unsing provided docker-compose.yaml
docker-compose up
Run examples with sbt:
# run to setup example topics + subscription
sbt 'examples/runMain ExamplesAdminSetup'
# run subscription
sbt 'examples/runMain BasicSubscription'
# run samples publisher
sbt 'examples/runMain SamplesPublisher'
# or choose in sbt which example to run
sbt 'examples/run'
For more examples check out the examples
folder or the files
Http PubsubCloudTest.scala or
Google PubsubCloudTest.scala
which contain usage of subscriber, publisher and the admin api running against an actaul Pub/Sub in the cloud for manual execution.