From 39f81c115d95278069b4e07085adb388b6e2c4a5 Mon Sep 17 00:00:00 2001 From: adamw Date: Tue, 19 Nov 2024 13:24:11 +0100 Subject: [PATCH] WIP --- .github/workflows/ci.yml | 38 +++++ .github/workflows/scala-steward.yml | 28 +++ .gitignore | 25 +++ .mergify.yml | 49 ++++++ .scala-steward.conf | 3 + .scalafmt.conf | 4 + build.sbt | 28 +++ core/src/main/resources/logback.xml | 12 ++ ..._blocking_Kafka_streaming_on_the_JVM.scala | 16 ++ .../scala/pres/S020_Virtual_threads.scala | 27 +++ core/src/main/scala/pres/S030_Why_Loom.scala | 12 ++ .../pres/S040_Why_reactive_streams.scala | 19 +++ .../pres/S050_Pekko_streams_example.scala | 81 +++++++++ .../pres/S060_Structured_concurrency.scala | 29 ++++ core/src/main/scala/pres/S070_Channels.scala | 24 +++ .../pres/S075_Channels_performance.scala | 19 +++ core/src/main/scala/pres/S080_Flow.scala | 47 +++++ .../main/scala/pres/S090_Kafka_publish.scala | 16 ++ .../pres/S100_Kafka_publish_commit.scala | 57 +++++++ .../main/scala/pres/S110_SoftwareMill.scala | 25 +++ core/src/main/scala/pres/S120_Pros_cons.scala | 34 ++++ core/src/main/scala/pres/S130_Summary.scala | 26 +++ core/src/main/scala/pres/setupKafka.scala | 42 +++++ .../src/main/scala/pres/startHttpServer.scala | 28 +++ core/src/main/scala/pres/util.scala | 15 ++ docker-compose.yml | 161 ++++++++++++++++++ project/build.properties | 1 + project/plugins.sbt | 2 + version.sbt | 1 + 29 files changed, 869 insertions(+) create mode 100644 .github/workflows/ci.yml create mode 100644 .github/workflows/scala-steward.yml create mode 100644 .gitignore create mode 100644 .mergify.yml create mode 100644 .scala-steward.conf create mode 100644 .scalafmt.conf create mode 100644 build.sbt create mode 100644 core/src/main/resources/logback.xml create mode 100644 core/src/main/scala/pres/S010_Fast_blocking_Kafka_streaming_on_the_JVM.scala create mode 100644 core/src/main/scala/pres/S020_Virtual_threads.scala create mode 100644 core/src/main/scala/pres/S030_Why_Loom.scala create mode 100644 core/src/main/scala/pres/S040_Why_reactive_streams.scala create mode 100644 core/src/main/scala/pres/S050_Pekko_streams_example.scala create mode 100644 core/src/main/scala/pres/S060_Structured_concurrency.scala create mode 100644 core/src/main/scala/pres/S070_Channels.scala create mode 100644 core/src/main/scala/pres/S075_Channels_performance.scala create mode 100644 core/src/main/scala/pres/S080_Flow.scala create mode 100644 core/src/main/scala/pres/S090_Kafka_publish.scala create mode 100644 core/src/main/scala/pres/S100_Kafka_publish_commit.scala create mode 100644 core/src/main/scala/pres/S110_SoftwareMill.scala create mode 100644 core/src/main/scala/pres/S120_Pros_cons.scala create mode 100644 core/src/main/scala/pres/S130_Summary.scala create mode 100644 core/src/main/scala/pres/setupKafka.scala create mode 100644 core/src/main/scala/pres/startHttpServer.scala create mode 100644 core/src/main/scala/pres/util.scala create mode 100644 docker-compose.yml create mode 100644 project/build.properties create mode 100644 project/plugins.sbt create mode 100644 version.sbt diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..00689b3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,38 @@ +name: CI +on: + pull_request: + branches: ['**'] + push: + branches: ['**'] + tags: [v*] +jobs: + ci: + # run on external PRs, but not on internal PRs since those will be run by push to branch + if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != github.repository + runs-on: ubuntu-20.04 + steps: + - name: Checkout + uses: actions/checkout@v2 + - name: Set up JDK 11 + uses: actions/setup-java@v1 + with: + java-version: 11 + - name: Cache sbt + uses: actions/cache@v2 + with: + path: | + ~/.sbt + ~/.ivy2/cache + ~/.coursier + key: sbt-cache-${{ runner.os }}-${{ hashFiles('project/build.properties') }} + - name: Compile + run: sbt -v compile + - name: Test + run: sbt -v test + - name: Cleanup + run: | + rm -rf "$HOME/.ivy2/local" || true + find $HOME/.ivy2/cache -name "ivydata-*.properties" -delete || true + find $HOME/.ivy2/cache -name "*-LM-SNAPSHOT*" -delete || true + find $HOME/.cache/coursier/v1 -name "ivydata-*.properties" -delete || true + find $HOME/.sbt -name "*.lock" -delete || true diff --git a/.github/workflows/scala-steward.yml b/.github/workflows/scala-steward.yml new file mode 100644 index 0000000..7b59b45 --- /dev/null +++ b/.github/workflows/scala-steward.yml @@ -0,0 +1,28 @@ +name: Scala Steward + +# This workflow will launch at 00:00 every day +on: + schedule: + - cron: '0 0 * * *' + workflow_dispatch: + +jobs: + scala-steward: + runs-on: ubuntu-22.04 + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + distribution: 'temurin' + java-version: 11 + cache: 'sbt' + - name: Launch Scala Steward + uses: scala-steward-org/scala-steward-action@v2 + with: + author-name: scala-steward + author-email: scala-steward + github-token: ${{ secrets.REPO_GITHUB_TOKEN }} + repo-config: .scala-steward.conf + ignore-opts-files: false diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d997e95 --- /dev/null +++ b/.gitignore @@ -0,0 +1,25 @@ +*.class +*.log + +.cache +.env +.envrc +.history +.sdkmanrc +.lib/ +dist/* +target/ +lib_managed/ +local.conf +src_managed/ +project/boot/ +project/plugins/project/ + +.idea* + +# Metals +.metals/ +.bsp/ +.bloop/ +metals.sbt +.vscode diff --git a/.mergify.yml b/.mergify.yml new file mode 100644 index 0000000..a4d4900 --- /dev/null +++ b/.mergify.yml @@ -0,0 +1,49 @@ +pull_request_rules: + - name: delete head branch after merge + conditions: [] + actions: + delete_head_branch: {} + - name: automatic merge for softwaremill-ci pull requests affecting build.sbt + conditions: + - author=softwaremill-ci + - check-success=ci + - "#files=1" + - files=build.sbt + actions: + merge: + method: merge + - name: automatic merge for softwaremill-ci pull requests affecting project plugins.sbt + conditions: + - author=softwaremill-ci + - check-success=ci + - "#files=1" + - files=project/plugins.sbt + actions: + merge: + method: merge + - name: semi-automatic merge for softwaremill-ci pull requests + conditions: + - author=softwaremill-ci + - check-success=ci + - "#approved-reviews-by>=1" + actions: + merge: + method: merge + - name: automatic merge for softwaremill-ci pull requests affecting project build.properties + conditions: + - author=softwaremill-ci + - check-success=ci + - "#files=1" + - files=project/build.properties + actions: + merge: + method: merge + - name: automatic merge for softwaremill-ci pull requests affecting .scalafmt.conf + conditions: + - author=softwaremill-ci + - check-success=ci + - "#files=1" + - files=.scalafmt.conf + actions: + merge: + method: merge diff --git a/.scala-steward.conf b/.scala-steward.conf new file mode 100644 index 0000000..51705ee --- /dev/null +++ b/.scala-steward.conf @@ -0,0 +1,3 @@ +updates.ignore = [ + {groupId = "org.scala-lang", artifactId = "scala-compiler", version = "2.13."}, +] diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..fdd3490 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,4 @@ +version = 3.8.0 +maxColumn = 80 +rewrite.rules = [RedundantBraces, RedundantParens, SortImports] +runner.dialect = scala3 diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..78883b6 --- /dev/null +++ b/build.sbt @@ -0,0 +1,28 @@ +import com.softwaremill.SbtSoftwareMillCommon.commonSmlBuildSettings + +lazy val commonSettings = commonSmlBuildSettings ++ Seq( + organization := "com.softwaremill.kafka", + scalaVersion := "3.5.2" +) + +val scalaTest = "org.scalatest" %% "scalatest" % "3.2.18" % Test + +lazy val rootProject = (project in file(".")) + .settings(commonSettings: _*) + .settings(publishArtifact := false, name := "kafka-ox-pres") + .aggregate(core) + +lazy val core: Project = (project in file("core")) + .settings(commonSettings: _*) + .settings( + name := "core", + libraryDependencies ++= Seq( + "com.softwaremill.sttp.tapir" %% "tapir-netty-server-sync" % "1.11.9", + "com.softwaremill.ox" %% "kafka" % "0.5.3", + "ch.qos.logback" % "logback-classic" % "1.5.8", + "org.apache.pekko" %% "pekko-connectors-kafka" % "1.1.0", + "org.apache.pekko" %% "pekko-stream" % "1.1.2", + "com.softwaremill.sttp.client4" %% "core" % "4.0.0-M19", + scalaTest + ) + ) diff --git a/core/src/main/resources/logback.xml b/core/src/main/resources/logback.xml new file mode 100644 index 0000000..9db4430 --- /dev/null +++ b/core/src/main/resources/logback.xml @@ -0,0 +1,12 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + \ No newline at end of file diff --git a/core/src/main/scala/pres/S010_Fast_blocking_Kafka_streaming_on_the_JVM.scala b/core/src/main/scala/pres/S010_Fast_blocking_Kafka_streaming_on_the_JVM.scala new file mode 100644 index 0000000..4608d2c --- /dev/null +++ b/core/src/main/scala/pres/S010_Fast_blocking_Kafka_streaming_on_the_JVM.scala @@ -0,0 +1,16 @@ +package pres + +/* + _____ _ _ _ _ _ _ + | _ |_| |___ _____ | | | |___ ___ ___| |_|_| + | | . | .'| | | | | | .'| _|_ -| '_| | + |__|__|___|__,|_|_|_| |_____|__,|_| |___|_,_|_| + + ██╗ ██╗ █████╗ ███████╗██╗ ██╗ █████╗ + ██║ ██╔╝██╔══██╗██╔════╝██║ ██╔╝██╔══██╗ + █████╔╝ ███████║█████╗ █████╔╝ ███████║ + ██╔═██╗ ██╔══██║██╔══╝ ██╔═██╗ ██╔══██║ + ██║ ██╗██║ ██║██║ ██║ ██╗██║ ██║ + ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ╚═╝ ╚═╝╚═╝ ╚═╝ + */ +class S010_Fast_blocking_Kafka_streaming_on_the_JVM diff --git a/core/src/main/scala/pres/S020_Virtual_threads.scala b/core/src/main/scala/pres/S020_Virtual_threads.scala new file mode 100644 index 0000000..257b79b --- /dev/null +++ b/core/src/main/scala/pres/S020_Virtual_threads.scala @@ -0,0 +1,27 @@ +package pres + +import ox.discard +import java.util.concurrent.ConcurrentHashMap + +@main def s020_Virtual_threads() = + timed("loom") { + val threads = new Array[Thread](10_000_000) + val results = ConcurrentHashMap.newKeySet[Int]() + + var i = 0 + while i < threads.length do + threads(i) = Thread + .ofVirtual() + .start( + new Runnable: + def run() = results.add(0).discard + ); + i += 1 + end while + + i = 0 + while i < threads.length do + threads(i).join() + i += 1 + end while + } diff --git a/core/src/main/scala/pres/S030_Why_Loom.scala b/core/src/main/scala/pres/S030_Why_Loom.scala new file mode 100644 index 0000000..c3f8a04 --- /dev/null +++ b/core/src/main/scala/pres/S030_Why_Loom.scala @@ -0,0 +1,12 @@ +package pres + +class S030_Why_Loom { + /* + + Loom principles: + * direct syntax, blocking code + * non-viral + * meaningful stack traces + + */ +} diff --git a/core/src/main/scala/pres/S040_Why_reactive_streams.scala b/core/src/main/scala/pres/S040_Why_reactive_streams.scala new file mode 100644 index 0000000..923ae02 --- /dev/null +++ b/core/src/main/scala/pres/S040_Why_reactive_streams.scala @@ -0,0 +1,19 @@ +package pres + +class S040_Why_reactive_streams { + /* + + [Reactive streams](https://www.reactive-streams.org) are useful! + What are they? + + * "govern the exchange of stream data across an asynchronous boundary" + * within bounded memory + + When to use RS? + * process streaming data + * manage concurrency + * interface with I/O operations + * safely handle errors + + */ +} diff --git a/core/src/main/scala/pres/S050_Pekko_streams_example.scala b/core/src/main/scala/pres/S050_Pekko_streams_example.scala new file mode 100644 index 0000000..b9a5aaf --- /dev/null +++ b/core/src/main/scala/pres/S050_Pekko_streams_example.scala @@ -0,0 +1,81 @@ +package pres + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.kafka.CommitterSettings +import org.apache.pekko.kafka.ConsumerSettings +import org.apache.pekko.kafka.ProducerMessage +import org.apache.pekko.kafka.ProducerSettings +import org.apache.pekko.kafka.Subscriptions +import org.apache.pekko.kafka.scaladsl.Committer +import org.apache.pekko.kafka.scaladsl.Consumer +import org.apache.pekko.kafka.scaladsl.Consumer.DrainingControl +import org.apache.pekko.kafka.scaladsl.Producer +import org.slf4j.LoggerFactory +import ox.discard +import ox.get +import sttp.client4.* +import sttp.client4.httpclient.HttpClientFutureBackend + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +@main def s050_Pekko_streams_example(): Unit = + val sourceTopic = "t1" + val destTopic = "t2" + val group = "g1" + + val logger = LoggerFactory.getLogger("pekko-streams-example") + + given system: ActorSystem = ActorSystem("transfer") + try + import system.dispatcher + + val producerSettings = + ProducerSettings(system, new StringSerializer, new StringSerializer) + .withBootstrapServers(bootstrapServer) + val consumerSettings = + ConsumerSettings(system, new StringDeserializer, new StringDeserializer) + .withBootstrapServers(bootstrapServer) + .withGroupId(group) + .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + val backend = HttpClientFutureBackend() + + val stream = Consumer + .committableSource(consumerSettings, Subscriptions.topics(sourceTopic)) + .mapAsync(5) { commitableMsg => + val msg = commitableMsg.record.value() + val work1 = basicRequest + .response(asString.getRight) + .post(uri"http://localhost:8080/work1") + .body(msg) + .send(backend) + val work2 = basicRequest + .response(asString.getRight) + .post(uri"http://localhost:8080/work2") + .body(msg) + .send(backend) + + for { + response1 <- work1 + response2 <- work2 + result = response1.body + ": " + response2.body + _ = logger.info(s"Result for $msg: $result") + } yield ProducerMessage.single( + new ProducerRecord[String, String](destTopic, null, result), + commitableMsg.committableOffset + ) + } + .via(Producer.flexiFlow(producerSettings)) + .map(_.passThrough) + .toMat(Committer.sink(CommitterSettings(system)))(DrainingControl.apply) + .run() + .streamCompletion + + stream.get().discard + finally system.terminate().get().discard +end s050_Pekko_streams_example diff --git a/core/src/main/scala/pres/S060_Structured_concurrency.scala b/core/src/main/scala/pres/S060_Structured_concurrency.scala new file mode 100644 index 0000000..0a45a53 --- /dev/null +++ b/core/src/main/scala/pres/S060_Structured_concurrency.scala @@ -0,0 +1,29 @@ +package pres + +import ox.* + +import scala.concurrent.duration.DurationInt + +@main def s060_Structured_concurrency_1(): Unit = + supervised: + val f1 = fork: + sleep(2.seconds) + 1 + + val f2 = fork: + sleep(1.second) + 2 + + println(f1.join() + f2.join()) + +@main def s060_Structured_concurrency_2(): Unit = + supervised: + val f1 = fork: + sleep(2.seconds) + 1 + + val f2 = fork[Int]: + sleep(1.second) + throw new RuntimeException("boom") + + println(f1.join() + f2.join()) diff --git a/core/src/main/scala/pres/S070_Channels.scala b/core/src/main/scala/pres/S070_Channels.scala new file mode 100644 index 0000000..efce70f --- /dev/null +++ b/core/src/main/scala/pres/S070_Channels.scala @@ -0,0 +1,24 @@ +package pres + +import ox.* +import ox.channels.Channel +import ox.channels.selectOrClosed + +import scala.concurrent.duration.DurationInt + +@main def s070_Channels(): Unit = + val ch1 = Channel.bufferedDefault[Int] + val ch2 = Channel.bufferedDefault[Int] + + supervised: + def startFork(ch: Channel[Int], start: Int, end: Int): Unit = + forkDiscard: + for i <- start until end do + ch.send(i) + sleep(100.millis) + ch.done() + + startFork(ch1, 0, 5) + startFork(ch2, 40, 45) + + for i <- 0 to 11 do println(selectOrClosed(ch1, ch2)) diff --git a/core/src/main/scala/pres/S075_Channels_performance.scala b/core/src/main/scala/pres/S075_Channels_performance.scala new file mode 100644 index 0000000..fda6dca --- /dev/null +++ b/core/src/main/scala/pres/S075_Channels_performance.scala @@ -0,0 +1,19 @@ +package pres + +object S075_Channels_performance { + /* + + Parallel benchmark: 10 000 thread/coroutine/goroutine pairs connected by channels, + and send 100 000 values from the first thread/coroutine/goroutine in the pair, + to the second: + + ╔══════════════════════════════════════════════════╗ + Ox ╢████████████████████████████████████████████░░░░░░╟ 14.155 + Kotlin ╢███████████████████████████████░░░░░░░░░░░░░░░░░░░╟ 9.847 + Java built-in ╢██████████████████████████████████████████████████╟ 16.053 + Go ╢████████████████████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░╟ 6.38 + ╠══════════════════════════════════════════════════╣ + 0 16.053 + + */ +} diff --git a/core/src/main/scala/pres/S080_Flow.scala b/core/src/main/scala/pres/S080_Flow.scala new file mode 100644 index 0000000..b0e8d77 --- /dev/null +++ b/core/src/main/scala/pres/S080_Flow.scala @@ -0,0 +1,47 @@ +package pres + +import ox.* +import ox.channels.Channel +import ox.channels.ChannelClosed +import ox.channels.selectOrClosed +import ox.flow.Flow + +import scala.concurrent.duration.DurationInt + +@main def s080_Flow_1(): Unit = + Flow + .fromValues(11, 24, 51, 76, 78, 9, 1, 44) + .map(_ + 3) + .filter(_ % 2 == 0) + .intersperse(5) + .mapStateful(() => 0) { (state, value) => + val newState = state + value + (newState, newState) + } + .runToList() + .pipe(println) + +@main def s080_Flow_2(): Unit = + val f1 = Flow.tick(1.second, "tick1") + val f2 = Flow.tick(2.seconds, "tick2") + + f1.merge(f2).runForeach(println) + +@main def s080_Flow_3(): Unit = + val data = Channel.bufferedDefault[Int] + val errors = Channel.unlimited[Exception] + + Flow + .usingEmit: emit => + // connect to Kafka using data & errors + forever: + selectOrClosed(errors.receiveClause, data.receiveClause) match + case data.Received(i) => + if i % 2 == 0 then emit(i) + else + emit(i) + emit(i + 1) + case errors.Received(e) => throw e + case ChannelClosed.Done => // end + case ChannelClosed.Error(e) => throw e + .discard diff --git a/core/src/main/scala/pres/S090_Kafka_publish.scala b/core/src/main/scala/pres/S090_Kafka_publish.scala new file mode 100644 index 0000000..57e50cc --- /dev/null +++ b/core/src/main/scala/pres/S090_Kafka_publish.scala @@ -0,0 +1,16 @@ +package pres + +import org.apache.kafka.clients.producer.ProducerRecord +import ox.flow.Flow +import ox.kafka.KafkaStage.* +import ox.kafka.ProducerSettings + +@main def s090_Kafka_publish(): Unit = + val settings = ProducerSettings.default.bootstrapServers(bootstrapServer) + Flow + .repeatEval(randomString()) + .take(1000) + .map(msg => ProducerRecord[String, String]("t1", msg)) + .mapPublish(settings) + .tap(metadata => println(s"Published offset: ${metadata.offset()}")) + .runDrain() diff --git a/core/src/main/scala/pres/S100_Kafka_publish_commit.scala b/core/src/main/scala/pres/S100_Kafka_publish_commit.scala new file mode 100644 index 0000000..2d8aafb --- /dev/null +++ b/core/src/main/scala/pres/S100_Kafka_publish_commit.scala @@ -0,0 +1,57 @@ +package pres + +import org.apache.kafka.clients.producer.ProducerRecord +import org.slf4j.LoggerFactory +import ox.kafka.ConsumerSettings +import ox.kafka.ConsumerSettings.AutoOffsetReset +import ox.kafka.KafkaFlow +import ox.kafka.KafkaStage.* +import ox.kafka.ProducerSettings +import ox.kafka.SendPacket +import ox.par +import sttp.client4.* +import sttp.client4.httpclient.HttpClientSyncBackend + +@main def s100_Kafka_publish_commit(): Unit = + val sourceTopic = "t1" + val destTopic = "t2" + val group = "g1" + + val logger = LoggerFactory.getLogger("pekko-streams-example") + + val consumerSettings = ConsumerSettings + .default(group) + .bootstrapServers(bootstrapServer) + .autoOffsetReset(AutoOffsetReset.Earliest) + val producerSettings = + ProducerSettings.default.bootstrapServers(bootstrapServer) + + val backend = HttpClientSyncBackend() + KafkaFlow + .subscribe(consumerSettings, sourceTopic) + .mapPar(5) { receivedMsg => + val msg = receivedMsg.value + + val (response1, response2) = par( + basicRequest + .response(asString.getRight) + .post(uri"http://localhost:8080/work1") + .body(msg) + .send(backend), + basicRequest + .response(asString.getRight) + .post(uri"http://localhost:8080/work2") + .body(msg) + .send(backend) + ) + + val result = response1.body + ": " + response2.body + logger.info(s"Result for $msg: $result") + + (result, receivedMsg) + } + .map((result, received) => + SendPacket(ProducerRecord[String, String](destTopic, result), received) + ) + .mapPublishAndCommit(producerSettings) + .runDrain() diff --git a/core/src/main/scala/pres/S110_SoftwareMill.scala b/core/src/main/scala/pres/S110_SoftwareMill.scala new file mode 100644 index 0000000..e99777f --- /dev/null +++ b/core/src/main/scala/pres/S110_SoftwareMill.scala @@ -0,0 +1,25 @@ +package pres + +object S110_SoftwareMill: + val SoftwareMillBlog = "https://softwaremill.com/blog" + + val Backends = true + val Frontends = true + val ML_AI = true + val DevOps = true + + val Consulting: List[String] = + List( + "Scala", + "Java", + "Kotlin", + "Rust", + "TypeScript", + "Architecture", + "Kafka", + "Cassandra", + "Distributed systems" + ) + + val OpenSource_Top5: List[String] = + List("sttp", "tapir", "elasticmq", "struts", "hibernate envers") diff --git a/core/src/main/scala/pres/S120_Pros_cons.scala b/core/src/main/scala/pres/S120_Pros_cons.scala new file mode 100644 index 0000000..1ea548b --- /dev/null +++ b/core/src/main/scala/pres/S120_Pros_cons.scala @@ -0,0 +1,34 @@ +package pres + +class S120_Pros_cons { + /* + + Alpakka / Pekko Kafka: + (+) versatile API + (+) battle-proven implementation + (+) flexible producers / consumer creation + (+) support for Kafka transactions + (-) difficult to create new stages + (-) colored functions (Future / normal) + (-) stack traces + (-) additional runtime needed + + Ox: + (+) performance (channels) + (+) easy to create new stages + (+) no colored functions + (+) stack traces + (+) no additional runtime needed + (-) in development + (-) no support for Kafka's transactions (yet) + (-) not optimized (yet) + (-) TBD? + + Both implementations are reactive: + * back-pressured + * error-handling + * completion + * asynchronous processing + + */ +} diff --git a/core/src/main/scala/pres/S130_Summary.scala b/core/src/main/scala/pres/S130_Summary.scala new file mode 100644 index 0000000..5548b35 --- /dev/null +++ b/core/src/main/scala/pres/S130_Summary.scala @@ -0,0 +1,26 @@ +package pres + +object S110_Summary: + def _0: String = "Yes, direct-style Kafka streams are possible" + + def _1: String = + "Flow = Structured concurrency + channels + virtual threads + lazy streams" + + def _2: String = "Avoid explicit concurrency using a functional API" + + def _3: String = "Integrate easily using an imperative API" + + def _4: String = + "Kafka's consume-transform-publish-commit pattern available OOTB" + + /* + Links: + - [Ox](https://github.com/softwaremill/ox) + - [ScalaTimes](https://scalatimes.com) + - [Myself](https://warski.org) + - [SoftwareMill](https://softwaremill.com) + */ + + val StarOnGitHub: Boolean = YesPlease + + val ThankYou = true diff --git a/core/src/main/scala/pres/setupKafka.scala b/core/src/main/scala/pres/setupKafka.scala new file mode 100644 index 0000000..ca932a3 --- /dev/null +++ b/core/src/main/scala/pres/setupKafka.scala @@ -0,0 +1,42 @@ +package pres + +import org.apache.kafka.clients.producer.ProducerRecord +import ox.* + +import ox.kafka.* +import ox.flow.Flow +import java.util.Properties +import scala.jdk.CollectionConverters.* +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic + +val bootstrapServer = "localhost:9092" + +@main def createTopic(): Unit = + val props = new Properties() + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer) + + val adminClient = AdminClient.create(props) + try + val newTopic = new NewTopic("t2", 1, 1: Short) + adminClient.createTopics(Seq(newTopic).asJava).all().get() + println(s"Topic created successfully!") + finally adminClient.close() +end createTopic + +@main def publish(): Unit = + val topic = "t1" + + timed("publish"): + import KafkaStage.* + + val settings = ProducerSettings.default.bootstrapServers(bootstrapServer) + Flow + .repeatEval(randomString()) + // 100 bytes * 10000000 = 1 GB + .take(10_000_000) + .map(msg => ProducerRecord[String, String](topic, msg)) + .mapPublish(settings) + .runDrain() +end publish diff --git a/core/src/main/scala/pres/startHttpServer.scala b/core/src/main/scala/pres/startHttpServer.scala new file mode 100644 index 0000000..bb88c24 --- /dev/null +++ b/core/src/main/scala/pres/startHttpServer.scala @@ -0,0 +1,28 @@ +package pres + +import ox.sleep +import sttp.tapir.* +import sttp.tapir.server.netty.sync.NettySyncServer +import scala.util.Random +import scala.concurrent.duration.DurationInt + +@main def startHttpServer(): Unit = + val work1Endpoint = endpoint.post + .in("work1") + .in(stringBody) + .out(stringBody) + .handleSuccess { in => + sleep(Random.nextInt(1000).milliseconds) + in.reverse + } + + val work2Endpoint = endpoint.post + .in("work2") + .in(stringBody) + .out(stringBody) + .handleSuccess { in => + sleep(Random.nextInt(1000).milliseconds) + in.hashCode().toString() + } + + NettySyncServer().addEndpoints(List(work1Endpoint, work2Endpoint)).startAndWait() diff --git a/core/src/main/scala/pres/util.scala b/core/src/main/scala/pres/util.scala new file mode 100644 index 0000000..5ae2896 --- /dev/null +++ b/core/src/main/scala/pres/util.scala @@ -0,0 +1,15 @@ +package pres + +import scala.util.Random + +def timed[T](name: String)(f: => T): T = + val start = System.currentTimeMillis() + val result = f + val end = System.currentTimeMillis() + println(s"$name took ${end - start} ms") + result +end timed + +def randomString() = Random().alphanumeric.take(100).mkString + +val YesPlease = true diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..477602c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,161 @@ +--- +version: '2' +services: + + broker: + image: confluentinc/cp-server:7.5.0 + hostname: broker + container_name: broker + ports: + - "9092:9092" + - "9101:9101" + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' + KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter + KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' + KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' + + schema-registry: + image: confluentinc/cp-schema-registry:7.5.0 + hostname: schema-registry + container_name: schema-registry + depends_on: + - broker + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + + connect: + image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0 + hostname: connect + container_name: connect + depends_on: + - broker + - schema-registry + ports: + - "8083:8083" + environment: + CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' + CONNECT_REST_ADVERTISED_HOST_NAME: connect + CONNECT_GROUP_ID: compose-connect-group + CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 + CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter + CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 + # CLASSPATH required due to CC-2422 + CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar + CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" + CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" + CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" + CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR + + control-center: + image: confluentinc/cp-enterprise-control-center:7.5.0 + hostname: control-center + container_name: control-center + depends_on: + - broker + - schema-registry + - connect + - ksqldb-server + ports: + - "9021:9021" + environment: + CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' + CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' + CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors' + CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" + CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" + CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" + CONTROL_CENTER_REPLICATION_FACTOR: 1 + CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 + CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 + CONFLUENT_METRICS_TOPIC_REPLICATION: 1 + PORT: 9021 + + ksqldb-server: + image: confluentinc/cp-ksqldb-server:7.5.0 + hostname: ksqldb-server + container_name: ksqldb-server + depends_on: + - broker + - connect + ports: + - "8088:8088" + environment: + KSQL_CONFIG_DIR: "/etc/ksql" + KSQL_BOOTSTRAP_SERVERS: "broker:29092" + KSQL_HOST_NAME: ksqldb-server + KSQL_LISTENERS: "http://0.0.0.0:8088" + KSQL_CACHE_MAX_BYTES_BUFFERING: 0 + KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" + KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" + KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" + KSQL_KSQL_CONNECT_URL: "http://connect:8083" + KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 + KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' + KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' + + ksqldb-cli: + image: confluentinc/cp-ksqldb-cli:7.5.0 + container_name: ksqldb-cli + depends_on: + - broker + - connect + - ksqldb-server + entrypoint: /bin/sh + tty: true + + ksql-datagen: + image: confluentinc/ksqldb-examples:7.5.0 + hostname: ksql-datagen + container_name: ksql-datagen + depends_on: + - ksqldb-server + - broker + - schema-registry + - connect + command: "bash -c 'echo Waiting for Kafka to be ready... && cub kafka-ready -b broker:29092 1 40 && echo Waiting for Confluent Schema Registry to be ready... && cub sr-ready schema-registry 8081 40 && echo Waiting a few seconds for topic creation to finish... && sleep 11 && tail -f /dev/null'" + environment: + KSQL_CONFIG_DIR: "/etc/ksql" + STREAMS_BOOTSTRAP_SERVERS: broker:29092 + STREAMS_SCHEMA_REGISTRY_HOST: schema-registry + STREAMS_SCHEMA_REGISTRY_PORT: 8081 + + rest-proxy: + image: confluentinc/cp-kafka-rest:7.5.0 + depends_on: + - broker + - schema-registry + ports: + - 8082:8082 + hostname: rest-proxy + container_name: rest-proxy + environment: + KAFKA_REST_HOST_NAME: rest-proxy + KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' + KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" + KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..04267b1 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.9.9 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..1583d49 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,2 @@ +val sbtSoftwareMillVersion = "2.0.20" +addSbtPlugin("com.softwaremill.sbt-softwaremill" % "sbt-softwaremill-common" % sbtSoftwareMillVersion) diff --git a/version.sbt b/version.sbt new file mode 100644 index 0000000..da70e5d --- /dev/null +++ b/version.sbt @@ -0,0 +1 @@ +ThisBuild / version := "0.1-SNAPSHOT"