Update an existing microservice to receive events from the Event Hub
In this task, you will update the vets microservice to receive events from the telemetry event hub. You can use the following guidance to implement these changes:
- Sending and Receiving Message by Azure Event Hubs and Spring Cloud Stream Binder Eventhubs in Spring Boot Application
- Spring Cloud Stream with Azure Event Hubs
- Use Java to send events to or receive events from Azure Event Hubs
Step by step guidance
-
Reading the messages from the event hub, also entails that you will need to checkpoint what part of the stream you already read and processed. For keeping track of the checkpoints you will use a storage account. Create the storage account and a container with the below steps:
STORAGE_ACCOUNT_NAME=stg$APPNAME$UNIQUEID echo $STORAGE_ACCOUNT_NAME az storage account create --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku "Standard_LRS" az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv STORAGE_ACCOUNT_ID=$(az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv) echo $STORAGE_ACCOUNT_ID STORAGE_CONTAINER=eventhubs-binder az storage container create --name $STORAGE_CONTAINER --account-name $STORAGE_ACCOUNT_NAME --public-access container --auth-mode login
-
The checkpointing will be done through AAD authentication with your user assigned managed identity. You will need to give the managed identity access to the storage container:
az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Account Contributor' --scope $STORAGE_ACCOUNT_ID az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Blob Data Contributor' --scope $STORAGE_ACCOUNT_ID az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Blob Data Owner' --scope $STORAGE_ACCOUNT_ID/containers/$STORAGE_CONTAINER
-
In the config repository you will need to add the configuration for receiving messages from the event hub. Replace the contents of the current
application.yml
file with the contents of the 0603_application.yml file. Make sure you fill out your current MySQL server name on line12
and the name of your event hub namespace on line28
. This file includes the following changes:- An additional
consume
binding for the$Default
consumer group of thetelemetry
event hub on line14
to16
. - An additional
checkpoint-store
for theeventshubs-binder
container of your storage account on lines29
to32
. Make sure you fill out the name of your storage account on line32
. - An additional
spring.cloud.eventhubs.bindings
configuration indicating checkpointing will be doneMANUAL
on lines33
to38
.
- An additional
-
Now that the configuration is done, you will update the
spring-petclinic-vets-service
. In your local application repository, use your favorite text editor to open thepom.xml
file of thespring-petclinic-vets-service
microservice, add to it another dependency element within the<!-- Spring Cloud -->
section of the<dependencies>
element, and save the change:<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
-
In the
spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets
folder, update theVetsServiceApplication.java
file with the below code:package org.springframework.samples.petclinic.vets; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.samples.petclinic.vets.system.VetsProperties; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Maciej Szarlinski */ @EnableDiscoveryClient @SpringBootApplication @EnableConfigurationProperties(VetsProperties.class) public class VetsServiceApplication { private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class); public static void main(String[] args) { SpringApplication.run(VetsServiceApplication.class, args); } @ServiceActivator(inputChannel = "telemetry.$Default.errors") public void consumerError(Message<?> message) { LOGGER.error("Handling consumer ERROR: " + message); } }
This adds a
consumeError
method to this class, which will be called in case of errors with the connection to your event hub. -
In the
spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets
folder, add a newservices
folder and create aEventHubListener.java
in this folder with the below contents.package org.springframework.samples.petclinic.vets.services; import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders; import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.samples.petclinic.vets.VetsServiceApplication; import org.springframework.stereotype.Service; import java.util.function.Consumer; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @Configuration public class EventHubListener { private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class); private int i = 0; @Bean public Consumer<Message<String>> consume() { return message -> { Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}", message.getPayload(), message.getHeaders().get(EventHubsHeaders.PARTITION_KEY), message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER), message.getHeaders().get(EventHubsHeaders.OFFSET), message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME) ); checkpointer.success() .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(error -> LOGGER.error("Exception found", error)) .block(); }; } }
This class has a
consume
method for consuming messages from the event hub. It uses a checkpointer to indicate which messages in the stream already got processed. With the logger we write out the message that got received. -
In the
spring-petclinic-microservices/spring-petclinic-vets-service/src/main/resources
folder, update the contents of theapplication.yml
file with the below contents:spring: application: name: vets-service config: import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/} cache: cache-names: vets profiles: active: production cloud: function: consume;
This adds the configuration for the cloud function of the consume method.
-
Make sure you saved all the files you just changed. In the Git Bash window, navigate back to the root folder of the spring petclinic repository and rebuild the vets microservice.
cd ~/workspaces/java-microservices-aks-lab/src mvn clean package -DskipTests -rf :spring-petclinic-vets-service
-
Navigate to the
staging-acr
directory, copy the jar file of the vets-service and rebuild the container.cd staging-acr rm spring-petclinic-vets-service-$VERSION.jar cp ../spring-petclinic-vets-service/target/spring-petclinic-vets-service-$VERSION.jar spring-petclinic-vets-service-$VERSION.jar docker build -t $MYACR.azurecr.io/spring-petclinic-vets-service:$VERSION \ --build-arg ARTIFACT_NAME=spring-petclinic-vets-service-$VERSION.jar \ --build-arg APP_PORT=8080 \ --build-arg AI_JAR=ai.jar \ . docker push $MYACR.azurecr.io/spring-petclinic-vets-service:$VERSION
-
Delete the current running vets service pod and double check that it starts running again.
kubectl get pods kubectl delete pod <vets-service-pod> kubectl get pods -w kubectl logs <vets-service-pod> -f
You should see in the logs messages related to the connection to the event hub.
In case you see errors during the startup of the vets service, double check the error message whether it provides info on why it’s not working as expected. Double check all the above steps. To redeploy the vets service: rebuild the jar file and copy it to the staging-acr directory, rebuild the container image and stop the previously running pod of the vets-service. Additionally you may also delete the previous version of the config-server, before deleting the vets-service pod, so you are sure you are running with latest config. Also make sure the config-server starts running properly again as well.