Til hovedinnhold

Prosjekt – Proof of concept med Amazon Web Services og Kafka. #2

Publisert: 10. september 2020

Fortsettelsen i våre eventyr med Event Driven Architecture, Mikroservices og Kafka. Denne gangen koder vi for Kafka med Kotlin

Artikkelnummer Andre artikler i denne serien
1 Prosjekt – Proof of concept med Amazon Web Services og Kafka. #1
2 Prosjekt – Proof of concept med Amazon Web Services og Kafka. #2

Da er vi klare for bloggpost nummer 2 i serien vår med Entur, AWS og Kafka. Forrige gang gikk vi gjennom våre erfaringer rundt det å sette opp AWS-stacken til å fungere med Kafka og for å kjøre applikasjonene våre i Kubernetes. Denne gangen skal vi skrive to applikasjoner som kjører i Kubernetes og skriver til Kafka-bussen.

Valget falt på kotlin fordi det er et populært språk for tiden der det finnes mye dokumentasjon og biblioteker for integrering mot Kafka. Og så vil vi bygge med Gradle for å kunne kjøre gradle-tasks mot applikasjonen.

Da vi begge er nybegynnere i Kafka ønsker vi i førsteomgang kun å få en enkelt tekst-melding til å sendes fra en producer og plukkes opp av en consumer. I Kafka er hvert topic delt inn i et sett med logger kjent som partisjoner. Producerne skriver til slutten på disse loggene, og consumerne leser disse loggene i sitt eget tempo.

Og siden vi ikke har noen erfaringer fra tidligere kaster vi oss bare ut i det og begynner på en Producer.
Her går vi til verks med å starte opp et flunkende nytt Kotlin-prosjekt i IntelliJ. Og lager en enkel main-metode som starter en funksjon som nå bare printer text til console.

fun main(args: Array<String>) {
  MockKafkaProducer().run()
} class MockKafkaProducer {
  fun run() {
    println("Application started")
  }

Dette er ikke veldig spennende, så vi må nesten legge til litt mer. Planen er jo å sende en melding til Kafka; så la oss definere en melding, og la oss også definere en klasse som kan ta seg av selve funksjonaliteten ved å sende meldingen:

fun run() {
  println("Application started")
  val message = "This message is sent from a Kafka-producer "
  MessageSender().send(message)
}

Sånn. La oss nå si oss mer eller mindre ferdig med denne delen og begynne på endelig funksjonalitet i klassen MessageSender. Den første tanken som slår oss er at vi trenger noe som kan sende Kafka-meldinger. KafkaProducer fra Apache virker jo akkurat i blinken, så vi definerer en KafkaProducer i MessageSender som en start. Denne trener også en del properties, så vi lager en getProperties() funksjon vi fyller ut senere også:

class MessageSender {
  fun send(message: String) {
    val props: Properties = getProperties()
    val producer = KafkaProducer<String, String>(props)
     }
}

Så er vi kommet dit at vi kan tenke på at producer kan sende en melding. Vi ser fra dokumentasjonen fra Apache at KafkaProducer har en send-metode som trenger en ProducerRecord. Vi ønsker å holde dette enkelt så i ProducerRecorden vår legger vi kun til en topic, vi har kalt denne "EnturMockKacka-topic1", og meldingen vi har med som argument til metoden.

fun send(message: String) {
  val props: Properties = getProperties()
  val producer = KafkaProducer<String, String>(props)
  val producerRecord = ProducerRecord<String, String>("EnturMockKafka-topic1", message)
  val result: Future<RecordMetadata> = producer.send(producerRecord)
  println(result.get())
}

 Vi gikk gjennom hvordan man setter en topic til Kafka i forrige bloggpost, men for å friske opp hukommelsen så tar vi det igjen.

Å lage “topics” krever at man kobler seg til Zookeeper fra EC2-maskinen. Det gjør vi ved å først finne ZookeeperConnectString:

aws kafka describe-cluster --region #regionen --cluster-arn #MSKClusterArn’en

For deretter å kunne lage “topics” fra terminalen:

bin/kafka-topics.sh --create --zookeeper #ZookeeperConnectString --replication-factor 3 --partitions 3 --topic #Hva-du-vil-kalle-topicen

(“replication-factor” og “partitions” kan velges etter hva man har behov for)

Vi har da satt inn "EnturMockKafka-topic1" som #Hva-du-vil-kalle-topicen i vår kode.

Nå er denne produceren nesten klar, det eneste som mangler for å kunne kjøre programmet er properties. Vi må initialisere KafkaProduce-objektet med properties som inneholder en key og value for serialisering. I dette programmet bruker vi kun en String-melding, derfor trenger vi bare å bruke StringSerializer. 
Vi må også ha med bootstrap server-detaljer for å kommunisere med Kafka. 
Man finner verdien til bootstrap.servers med denne kommandoen:

aws kafka get-bootstrap-brokers --cluster-arn #MSKClusterArn’en

Og den produserer en output som ser cirka slik ut: 

{
    "BootstrapBrokerStringTls": "b-3.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9094,b-1.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9094,b-2.exampleClusterName.abcde.c2.kafka.us-east-1.amazonaws.com:9094"
}

Hele vår getProperties() ser nå slik ut:

fun getProperties(): Properties {
  val properties = Properties()
  properties["bootstrap.servers"] = "b-1.kafkatestcluster.XXXXX.c3.kafka.eu-north-1.amazonaws.com:9094,b-2.kafkatestcluster.XXXXX.c3.kafka.eu-north-1.amazonaws.com:9094,b-3.kafkatestcluster.XXXXX.c3.kafka.eu-north-1.amazonaws.com:9094"
  properties["key.serializer"] = StringSerializer::class.java
  properties["value.serializer"] = StringSerializer::class.java
  properties["security.protocol"] = "SSL"
  return properties
}

Vi setter også security.protocol til SSL. Dette gjør vi fordi Kafka-klusteret vi satte opp brukte TLS-encryption, noe vi fant ut etter lang tid på å løse problemet med at programmet ikke ville sende til Kafka. Det var to måter å komme seg forbi dette problemet på, ene løsnigen er å la kommunikasjon på Kafka-klusteret gå gjennom plaintext, eller sette på secuirty.protocol til SSL her. Siden vi allerede har definert kafka-klusteret vårt så tar vi den enkle utveien og går over SSL.

Nå er vår første producer klar for første test, men før det skjer så har vi funnet det smart å kjøre denne i en loop så den kan produsere noe mer enn kun en enkelt melding; dette gjør testing mye enklere senere. Så vi utvider programmet med en enkel while og lar denne produsere en ny "unik" melding hvert 5 sekund.

fun send(message: String) {
  val props: Properties = getProperties()
  val producer = KafkaProducer<String, String>(props)
  println(" - Starting producer $producer- n")
  var counter = 1
  while (true) {
    try {
      val result = producer.send(ProducerRecord("EnturMockKafka-topic1", "$message, and is #$counter"))
      println("sent a record: $message")
      Thread.sleep(5000)
      result.get()
      println("waiting 5000 millis, message is sent")
      counter++
    } catch (ex: Exception) {
      println("exception occurred: " + ex.stackTrace)
    }
  }
}

(vi gjør ikke noe med resultatet av sendingen enda, men koder det inn som en variabel nå allikevel)

For å kunne kjøre dette programmet på Kubernetes-clusteret vårt så må det kompileres til et docker-image. Dette gjøres i build.gradle.kts-filen og er en ganske omstendelig prosess, men vi tar for oss steg for steg. Vi begynner med å definere en task som bygger prosjektet med en dependecy til fatjar, med main classen som argument. I utgangspunktet er en fat-jar et arkiv som inneholder både klasser og avhengigheter som trengs for å kjøre et program.

tasks {
  startScripts {
    mainClassName = "no.item.kafka.producer.MockKafkaProducerKt"
  }   build {
    dependsOn(fatJar)
  }
}
val fatJar = task("fatJar", type = Jar::class) {
  baseName = "${project.name}-fat"
  manifest {
    attributes["Implementation-Title"] = "Gradle Jar File Example"
    attributes["Implementation-Version"] = version
    attributes["Main-Class"] = "no.item.kafka.producer.MockKafkaProducerKt"
  }
  exclude("META-INF/*.RSA", "META-INF/*.SF", "META-INF/*.DSA")
  from(configurations.runtimeClasspath.get().map({ if (it.isDirectory) it else zipTree(it) }))
  with(tasks.jar.get() as CopySpec)
}

Så fortsetter vi med å legge til en docker-plugin for kotlin:

plugins{
    ... 
    id("com.bmuschko.docker-remote-api") version "6.5.0"
}

og en variabel-task som brukes for å bygge prosjektet til docker-image.

val buildImage = task("buildImage", DockerBuildImage::class) {
    inputDir.set(file("."))
    images.add("XXXXXX.dkr.ecr.eu-north-1.amazonaws.com/kafka-producer:latest")
}

Og et par variabeler der taskene er script som logger inn på AWS ECR slik at imaget kan pushes opp dit.

val pushImage = task("pushImage", DockerPushImage::class) {
  dependsOn(buildImage)
  images.add("XXXXXX.dkr.ecr.eu-north-1.amazonaws.com/kafka-producer:latest")
} val dockerInit = task("dockerInit", Exec::class) {
  workingDir = file(".")
  commandLine = listOf("./docker-login.sh")
}
val dockerInitWin = task("dockerInitWin", Exec::class) {
  workingDir = file(".")
  commandLine = listOf("cmd", "/c", "docker-login.bat")
}

Til slutt registrerer vi tre tasks som er avhengige av disse variablene vi akkurat definerte:

tasks.register("deploy") {
    dependsOn(tasks.build)
    dependsOn(pushImage)
} tasks.register("docker-init") {
  dependsOn(dockerInit)
} tasks.register("docker-init-win") {
  dependsOn(dockerInitWin)
}

Hele build.grade.kts-filen finner du her på GitHub. Den inneholder litt mer konfigurasjon, som ktlint, men disse stegene er de nødvendige for å kunne bruke programmet i kubernetes.
Docker bruker Dockerfile til å generere et image. Vi tar utgangspunkt i et Java image og kopierer over jar-filen, deretter starter vi applikasjonen ved oppstart av containeren.

Da er vår producer ferdig; og hele koden kan du finne her på GitHub.


Så la oss starte på vår Consumer. Vi holder logikken like enkel, så vår consumer skal bare lese meldingene produceren legger ut uten å behandle dem.
Vi begynner med et tomt prosjekt igjen og definerer en metode som ikke gjør noe annet enn å printe ut en string.

fun main() {
  MockKafkaConsumer().run()
} class MockKafkaConsumer {
  fun run() {
    println("Starting consumer")
}  

Der vi for produeren så at vi trengte noe for å sende en melding så trenger vi her noe som kan lese den  Apache's KafkaConsumer, som du kan finne dokumentasjonen til her, vil gjøre dette. Og utifra dokumentasjonenn trenger også denne Properties så vi legger til disse:

val props: Properties = getProperties()
val consumer: KafkaConsumer<String, String> = KafkaConsumer(props)

La oss definere properties slik:

fun getProperties(): Properties {
  val properties = Properties()
  with(properties) {
    put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "b-3.enturmockkafkacluster.XXXX.c3.kafka.eu-north-1.amazonaws.com:9094,b-2.enturmockkafkacluster.XXXX.c3.kafka.eu-north-1.amazonaws.com:9094,b-1.enturmockkafkacluster.XXX.c3.kafka.eu-north-1.amazonaws.com:9094")
    put(ConsumerConfig.GROUP_ID_CONFIG, "EnturMockKafkaConsumerGroup")
    put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer().javaClass.name)
    put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer().javaClass.name)
    put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
  }
  return properties
}

(Her lager vi properties på en annen måte enn i produceren siden vi ville vise at man kan bruke to forskjellige metoder og resultatet blir det samme) 

Group.id er en streng som unikt identifiserer gruppen av consumer-prosesser som denne consumeren tilhører. Group.id sammen med partisonene er det som definerer hvordan applikasjonene kjører i parallell. Applikasjoner i samme gruppe deler partisonene mellom seg, slik at hver applikasjon vil lese fra samme partison(er) hver gang. Hvis man har flere applikasjoner i en gruppe enn det er partisoner, vil de applikasjonene ikke kjøre ved mindre en av de andre går ned.

Vi har også med de samme serverne som tildigere, i tillegg til deserializer. De to nye feltene ENABLE_AUTO_COMMIT og AUTO_OFFSET_RESET:

AUTO_OFFSET_RESET: "Offset" er et stykke metadata, et heltall som kontinuerlig øker for hver melding som mottas i en partisjon. Hver melding vil ha en unik offset verdi i en partisjon.

Når en consumer leser meldingene fra partisjonen, lar den Kafka vite offset av den sist forbrukte meldingen. Denne offseten er lagret i et emne som heter _consumer_offsets. Ved å gjøre dette kan en consumer stoppe og starte på nytt uten å glemme hvilke meldinger den har konsumert.
Med auto_offset_reset bestemmer vi hva som skal skje når det ikke er noen opprinnelig offset i Kafka eller hvis den nåværende offseten ikke eksisterer lenger på serveren. Vi setter her denne til å nullstille offset automatisk til den tidligste.

ENABLE_AUTO_COMMIT: Som standard vil en consumer commite sin offset til Kafkahvert 5. sekund, eller hver gang data hentes fra det angitte emnet. Det vi kan gjøre er å kjøre offset av meldinger manuelt etter behandling. Dette gir oss full kontroll over når vi vurderer en melding behandlet og klar til å la Kafka få vite det. Derfor setter vi denne til false. Nå kan vi commite offsetet vårt manuelt etter at behandlingen har funnet sted, og hvis consumeren krasjer mens den behandler en melding vil den begynne å lese fra den samme offset.

Tilbake til consumern, så må den settes til å abbonere på topicen vår og consume meldingene som sendes. Det gjøres med denne linjen vi legger til i programmet vårt:

consumer.subscribe(Collections.singleton("EnturMockKafka-topic1"))

Vi vil sjekke kafka-bussen etter nye meldinger kontinuerlig, derfor setter vi en while-loop og ser etter nye meldinger hvert sekund:

while (true) {
  val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(1000))
  if (records.count() == 0) {
    continue
  }
  println("Got some records....")
  records.iterator().forEach {
    val record: String = it.value()
    println(record)
  }
  consumer.close()
}
 
 
Hele vår consumer ser nå slik ut:

fun main() {
  MockKafkaConsumer().run()
} class MockKafkaConsumer {
  fun run() {
    println("Starting consumer")     val props: Properties = getProperties()
    val consumer: KafkaConsumer<String, String> = KafkaConsumer(props)
    consumer.subscribe(Collections.singleton("EnturMockKafka-topic1"))     while (true) {
      val records: ConsumerRecords<String, String> = consumer.poll(Duration.ofMillis(1000))
      if (records.count() == 0) {
        continue
      }
      println("Got some records....")
      records.iterator().forEach {
        val record: String = it.value()
        println(record)
      }
      consumer.close()
    }
  }
}
 
 

Koden kan man  finne her på github.

 

Nå er det på tide å sjekke at alt vi har gjort hittill fungerer som ønsket. 

Første punkt er å bygge disse to prosjektene, vi gjør det med kommandoen:

gradlew deploy  

(deploy for å bygge og pushe opp til ECR)
 

Så starter vi opp EKS (om det ikke kjører fra før. Vi har prøvd å slå av våre AWS-tjenester når de ikke er i bruk.), her bruker vi CloudFormation, enten via console eller AWS.

Så oppdaterer vi kubernetes config-filen slik at vi kan koble oss til kubernetes med kubectl.

aws eks --region $region update-kubeconfig --name "entur-eks-cluster" --profile $profile

kubectl apply -f yourfile.yaml --kubeconfig=$HOME/.kube/config

Så er prosjektene oppe og snakker til hverandre. 
Man kan se println'ene som produceres med 

kubectl logs #label

 
Det var programmering og oppsett av våre to førte kafka-applikasjoner. Bli med i neste episode da vi skal hente ut data fra Entur gjennom deres åpne API med GraphQL queries og behandle denne dataen i kotlin og presentere på kafka-bussen vår.  

 

 

 

 

Kontakt oss

Vil du kontakte oss kan du sende mail på firmapost@item.no .

Nyhetsbrev

Ønsker du å motta vårt nyhetsbrev:

Meld meg på / Meld meg av