Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Update an existing microservice to receive events from the Event Hub

In this task, you will update the vets microservice to receive events from the telemetry event hub. You can use the following guidance to implement these changes:

Step by step guidance

  1. Reading the messages from the event hub, also entails that you will need to checkpoint what part of the stream you already read and processed. For keeping track of the checkpoints you will use a storage account. Create the storage account and a container with the below steps:

    STORAGE_ACCOUNT_NAME=stg$APPNAME$UNIQUEID
    echo $STORAGE_ACCOUNT_NAME
    az storage account create --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku "Standard_LRS" 
    az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv
    STORAGE_ACCOUNT_ID=$(az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv)
    echo $STORAGE_ACCOUNT_ID
    
    STORAGE_CONTAINER=eventhubs-binder
    az storage container create --name $STORAGE_CONTAINER --account-name $STORAGE_ACCOUNT_NAME --public-access container --auth-mode login
    
  2. The checkpointing will be done through AAD authentication with your user assigned managed identity. You will need to give the managed identity access to the storage container:

    az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Account Contributor' --scope $STORAGE_ACCOUNT_ID
    az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Blob Data Contributor' --scope $STORAGE_ACCOUNT_ID
    az role assignment create --assignee $USER_ASSIGNED_CLIENT_ID --role 'Storage Blob Data Owner' --scope $STORAGE_ACCOUNT_ID/containers/$STORAGE_CONTAINER
    
  3. In the config repository you will need to add the configuration for receiving messages from the event hub. Replace the contents of the current application.yml file with the contents of the 0603_application.yml file. Make sure you fill out your current MySQL server name on line 12 and the name of your event hub namespace on line 28. This file includes the following changes:

    • An additional consume binding for the $Default consumer group of the telemetry event hub on line 14 to 16.
    • An additional checkpoint-store for the eventshubs-binder container of your storage account on lines 29 to 32. Make sure you fill out the name of your storage account on line 32.
    • An additional spring.cloud.eventhubs.bindings configuration indicating checkpointing will be done MANUAL on lines 33 to 38.
  4. Now that the configuration is done, you will update the spring-petclinic-vets-service. In your local application repository, use your favorite text editor to open the pom.xml file of the spring-petclinic-vets-service microservice, add to it another dependency element within the <!-- Spring Cloud --> section of the <dependencies> element, and save the change:

     <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
     </dependency>  
    
  5. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets folder, update the VetsServiceApplication.java file with the below code:

    package org.springframework.samples.petclinic.vets;
       
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.samples.petclinic.vets.system.VetsProperties;
       
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
       
    /**
     * @author Maciej Szarlinski
     */
    @EnableDiscoveryClient
    @SpringBootApplication
    @EnableConfigurationProperties(VetsProperties.class)
    public class VetsServiceApplication {
       
    	private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class);
       
    	public static void main(String[] args) {
    		SpringApplication.run(VetsServiceApplication.class, args);
    	}
       
    	@ServiceActivator(inputChannel = "telemetry.$Default.errors")
        public void consumerError(Message<?> message) {
            LOGGER.error("Handling consumer ERROR: " + message);
        }
    }
    

    This adds a consumeError method to this class, which will be called in case of errors with the connection to your event hub.

  6. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets folder, add a new services folder and create a EventHubListener.java in this folder with the below contents.

    package org.springframework.samples.petclinic.vets.services;
       
       
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.samples.petclinic.vets.VetsServiceApplication;
    import org.springframework.stereotype.Service;
       
    import java.util.function.Consumer;
       
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
       
    @Configuration
    public class EventHubListener {
       
        private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class);
       
        private int i = 0;
       
        @Bean
        public Consumer<Message<String>> consume() {
            return message -> {
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload(),
                        message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                        message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                        message.getHeaders().get(EventHubsHeaders.OFFSET),
                        message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
       
                checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
            };
        }
    }   
    

    This class has a consume method for consuming messages from the event hub. It uses a checkpointer to indicate which messages in the stream already got processed. With the logger we write out the message that got received.

  7. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/resources folder, update the contents of the application.yml file with the below contents:

    spring:
      application:
        name: vets-service
      config:
        import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/}
      cache:
        cache-names: vets
      profiles:
        active: production
      cloud:
        function: consume;    
    

    This adds the configuration for the cloud function of the consume method.

  8. Make sure you saved all the files you just changed. In the Git Bash window, navigate back to the root folder of the spring petclinic repository and rebuild the vets microservice.

    cd ~/workspaces/java-microservices-aks-lab/src
    mvn clean package -DskipTests -rf :spring-petclinic-vets-service
    
  9. Navigate to the staging-acr directory, copy the jar file of the vets-service and rebuild the container.

    cd staging-acr
    rm spring-petclinic-vets-service-$VERSION.jar
       
    cp ../spring-petclinic-vets-service/target/spring-petclinic-vets-service-$VERSION.jar spring-petclinic-vets-service-$VERSION.jar
    docker build -t $MYACR.azurecr.io/spring-petclinic-vets-service:$VERSION \
        --build-arg ARTIFACT_NAME=spring-petclinic-vets-service-$VERSION.jar \
        --build-arg APP_PORT=8080 \
        --build-arg AI_JAR=ai.jar \
        .
       
    docker push $MYACR.azurecr.io/spring-petclinic-vets-service:$VERSION
    
  10. Delete the current running vets service pod and double check that it starts running again.

    kubectl get pods
    kubectl delete pod <vets-service-pod>
       
    kubectl get pods -w
       
    kubectl logs <vets-service-pod> -f
    

    You should see in the logs messages related to the connection to the event hub.

    In case you see errors during the startup of the vets service, double check the error message whether it provides info on why it’s not working as expected. Double check all the above steps. To redeploy the vets service: rebuild the jar file and copy it to the staging-acr directory, rebuild the container image and stop the previously running pod of the vets-service. Additionally you may also delete the previous version of the config-server, before deleting the vets-service pod, so you are sure you are running with latest config. Also make sure the config-server starts running properly again as well.