Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Update an existing microservice to receive events from the Event Hub

In this task, you will update the vets microservice to receive events from the telemetry event hub. You can use the following guidance to implement these changes:

Step by step guidance

  1. Reading the messages from the event hub, also entails that you will need to checkpoint what part of the stream you already read and processed. For keeping track of the checkpoints you will use a storage account. Create the storage account and a container with the below steps:

    STORAGE_ACCOUNT_NAME=stg$APPNAME$UNIQUEID
    echo $STORAGE_ACCOUNT_NAME
    az storage account create --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --location $LOCATION --sku "Standard_LRS" 
    az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv
    STORAGE_ACCOUNT_ID=$(az storage account show --name $STORAGE_ACCOUNT_NAME --resource-group $RESOURCE_GROUP --query id -o tsv)
    echo $STORAGE_ACCOUNT_ID
    
  2. For creating the storage container, you will need to make sure your current account has sufficient permissions on the storage account.

    CURRENT_USER_OBJECTID=$(az ad signed-in-user show --query id --output tsv)
    az role assignment create --assignee $CURRENT_USER_OBJECTID --role 'Storage Account Contributor' --scope $STORAGE_ACCOUNT_ID
    az role assignment create --assignee $CURRENT_USER_OBJECTID --role 'Storage Blob Data Contributor' --scope $STORAGE_ACCOUNT_ID
    
  3. You can now create the storage container.

    STORAGE_CONTAINER=eventhubs-binder
    az storage container create --name $STORAGE_CONTAINER --account-name $STORAGE_ACCOUNT_NAME --public-access container --auth-mode login
    
  4. The checkpointing will be done through AAD authentication with the user assigned managed identity of the vets service. You will need to give the managed identity access to the storage container:

    az role assignment create --assignee $VETS_SERVICE_CID --role 'Storage Account Contributor' --scope $STORAGE_ACCOUNT_ID
    az role assignment create --assignee $VETS_SERVICE_CID --role 'Storage Blob Data Contributor' --scope $STORAGE_ACCOUNT_ID
    az role assignment create --assignee $VETS_SERVICE_CID --role 'Storage Blob Data Owner' --scope $STORAGE_ACCOUNT_ID/containers/$STORAGE_CONTAINER
    
  5. In the config repository you will need to add the configuration for receiving messages from the event hub. Replace the contents of the current application.yml file with the contents of this application.yml file. Make sure you fill out your current Key Vault name on line 36 and the name of your event hub namespace on line 54. This file includes the following changes:

    • An additional consume binding for the $Default consumer group of the telemetry event hub on line 40 to 42.
    • An additional checkpoint-store for the eventshubs-binder container of your storage account on lines 56 to 58. Make sure you fill out the name of your storage account on line 58.
    • An additional spring.cloud.eventhubs.bindings configuration indicating checkpointing will be done MANUAL on lines 59 to 64.
  6. Now that the configuration is done, you will update the spring-petclinic-vets-service. In your local application repository, use your favorite text editor to open the pom.xml file of the spring-petclinic-vets-service microservice, add to it another dependency element within the <!-- Spring Cloud --> section of the <dependencies> element, and save the change:

     <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
     </dependency>  
    
  7. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets folder, update the VetsServiceApplication.java file with the below code:

    package org.springframework.samples.petclinic.vets;
       
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.samples.petclinic.vets.system.VetsProperties;
       
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
       
    /**
     * @author Maciej Szarlinski
     */
    @EnableDiscoveryClient
    @SpringBootApplication
    @EnableConfigurationProperties(VetsProperties.class)
    public class VetsServiceApplication {
       
    	private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class);
       
    	public static void main(String[] args) {
    		SpringApplication.run(VetsServiceApplication.class, args);
    	}
       
    	@ServiceActivator(inputChannel = "telemetry.$Default.errors")
        public void consumerError(Message<?> message) {
            LOGGER.error("Handling consumer ERROR: " + message);
        }
    }
    

    This adds a consumeError method to this class, which will be called in case of errors with the connection to your event hub.

  8. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/java/org/springframework/samples/petclinic/vets folder, add a new services folder and create a EventHubListener.java in this folder with the below contents.

    package org.springframework.samples.petclinic.vets.services;
       
       
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.samples.petclinic.vets.VetsServiceApplication;
    import org.springframework.stereotype.Service;
       
    import java.util.function.Consumer;
       
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
       
    @Configuration
    public class EventHubListener {
       
        private static final Logger LOGGER = LoggerFactory.getLogger(VetsServiceApplication.class);
       
        private int i = 0;
       
        @Bean
        public Consumer<Message<String>> consume() {
            return message -> {
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload(),
                        message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                        message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                        message.getHeaders().get(EventHubsHeaders.OFFSET),
                        message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
       
                checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
            };
        }
    }   
    

    This class has a consume method for consuming messages from the event hub. It uses a checkpointer to indicate which messages in the stream already got processed. With the logger we write out the message that got received.

  9. In the spring-petclinic-microservices/spring-petclinic-vets-service/src/main/resources folder, update the contents of the application.yml file with the below contents:

    spring:
      application:
        name: vets-service
      config:
        import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/}
      cache:
        cache-names: vets
      cloud:
        function: consume; 
    

    This adds the configuration for the cloud function of the consume method.

  10. Make sure you saved all the files you just changed. In the Git Bash window, navigate back to the root folder of the spring petclinic repository and rebuild the vets microservice.

    cd ~/workspaces/java-microservices-asa-e-lab/src
    mvn clean package -DskipTests -rf :spring-petclinic-vets-service
    
  11. Redeploy the vets-service microservice to Azure Spring Apps.

    az spring app deploy --name ${VETS_SERVICE} \
        --config-file-patterns ${VETS_SERVICE}  \
        --artifact-path ${VETS_SERVICE_JAR}