Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Update an existing microservice to send events to the Event Hub

You will now implement the functionality that will allow you to emulate sending events to the telemetry Event Hub. For this you will update the customers-service. Each time a new customer gets created, send a new event to the event hub, together with 100 new pet events (we are going to simulate some dummy load).

Step by step guidance

  1. In the customers-service pom.xml file, add the following extra dependency:

        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
        </dependency>
    
  2. Replace the contents of the src/main/java/org/springframework/samples/petclinic/customers/CustomersServiceApplication.java file with:

    package org.springframework.samples.petclinic.customers;
       
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
       
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
       
    /**
     * @author Maciej Szarlinski
     */
    @EnableDiscoveryClient
    @SpringBootApplication
    public class CustomersServiceApplication {
       
    	private static final Logger LOGGER = LoggerFactory.getLogger(CustomersServiceApplication.class);
       
    	public static void main(String[] args) {
    		SpringApplication.run(CustomersServiceApplication.class, args);
    	}
       
    	@ServiceActivator(inputChannel = "telemetry.errors")
        public void producerError(Message<?> message) {
            LOGGER.error("Handling Producer ERROR: " + message);
        }
    }   
    

    This adds an extra logger and a method that will be called in case of errors in the sending of events.

  3. Replace the contents of the src/spring-petclinic-customers-service/src/main/java/org/springframework/samples/petclinic/customers/web/OwnerResource.java file with:

    package org.springframework.samples.petclinic.customers.web;
       
    import io.micrometer.core.annotation.Timed;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.http.HttpStatus;
    import org.springframework.samples.petclinic.customers.model.Owner;
    import org.springframework.samples.petclinic.customers.model.OwnerRepository;
    import org.springframework.web.bind.annotation.*;
       
    import jakarta.validation.Valid;
    import jakarta.validation.constraints.Min;
    import java.util.List;
    import java.util.Optional;
       
    import reactor.core.publisher.Sinks;
    import reactor.core.publisher.Flux;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import java.util.function.Supplier;
       
    import org.springframework.beans.factory.annotation.Autowired;
       
    /**
     * @author Juergen Hoeller
     * @author Ken Krebs
     * @author Arjen Poutsma
     * @author Michael Isvy
     * @author Maciej Szarlinski
     */
    @RequestMapping("/owners")
    @RestController
    @Timed("petclinic.owner")
    @RequiredArgsConstructor
    @Slf4j
    class OwnerResource {
       
        private final OwnerRepository ownerRepository;
       
        @Autowired
        private Sinks.Many<Message<String>> many;
       
        private static final Logger LOGGER = LoggerFactory.getLogger(OwnerResource.class);
       
        /**
         * Create Owner
         */
        @PostMapping
        @ResponseStatus(HttpStatus.CREATED)
        public Owner createOwner(@Valid @RequestBody Owner owner) {
            LOGGER.info("+++Sending events+++");
            many.emitNext(MessageBuilder.withPayload("New owner created: " + owner.getFirstName() + " " + owner.getLastName() + " with many pets ...").build(), Sinks.   EmitFailureHandler.FAIL_FAST);
            for(int i = 0; i < 100; i++) {
                many.emitNext(MessageBuilder.withPayload("Pet " + i).build(), Sinks.EmitFailureHandler.FAIL_FAST);
            }
       
            return ownerRepository.save(owner);
        }
       
        /**
         * Read single Owner
         */
        @GetMapping(value = "/{ownerId}")
        public Optional<Owner> findOwner(@PathVariable("ownerId") @Min(1) int ownerId) {
            return ownerRepository.findById(ownerId);
        }
       
        /**
         * Read List of Owners
         */
        @GetMapping
        public List<Owner> findAll() {
            return ownerRepository.findAll();
        }
       
        /**
         * Update Owner
         */
        @PutMapping(value = "/{ownerId}")
        @ResponseStatus(HttpStatus.NO_CONTENT)
        public void updateOwner(@PathVariable("ownerId") @Min(1) int ownerId, @Valid @RequestBody Owner ownerRequest) {
            final Optional<Owner> owner = ownerRepository.findById(ownerId);
            final Owner ownerModel = owner.orElseThrow(() -> new ResourceNotFoundException("Owner "+ownerId+" not found"));
       
            // This is done by hand for simplicity purpose. In a real life use-case we should consider using MapStruct.
            ownerModel.setFirstName(ownerRequest.getFirstName());
            ownerModel.setLastName(ownerRequest.getLastName());
            ownerModel.setCity(ownerRequest.getCity());
            ownerModel.setAddress(ownerRequest.getAddress());
            ownerModel.setTelephone(ownerRequest.getTelephone());
            log.info("Saving owner {}", ownerModel);
            ownerRepository.save(ownerModel);
        }
    }
    

    This adds an additional sync, that is used in the createOwner method to add events to.

  4. In the /src/spring-petclinic-customers-service/src/main/java/org/springframework/samples/petclinic/customers/config/ folder, add an extra ManualProducerConfiguration.java file with the below contents:

    package org.springframework.samples.petclinic.customers.config;
       
       
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import com.azure.spring.messaging.AzureHeaders;
    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    import org.springframework.messaging.Message;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
       
    import java.util.function.Consumer;
    import java.util.function.Supplier;
       
    @Configuration
    public class ManualProducerConfiguration {
       
        private static final Logger LOGGER = LoggerFactory.getLogger(ManualProducerConfiguration.class);
       
        @Bean
        public Sinks.Many<Message<String>> many() {
            return Sinks.many().unicast().onBackpressureBuffer();
        }
       
        @Bean
        public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) {
            return () -> many.asFlux()
                             .doOnNext(m -> LOGGER.info("Manually sending message {}", m))
                             .doOnError(t -> LOGGER.error("Error encountered", t));
        }
    }   
    

    The ManualProducerConfiguration uses the sync to send the events to the event hub.

  5. Update the /src/spring-petclinic-customers-service/src/main/resources/application.yml file, and replace its contents with:

    spring:
      application:
        name: customers-service
      config:
        import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/}
      cloud:
        function: supply;       
    

    This adds an extra spring cloud function for the supply method in the ManualProducerConfiguration class.

  6. Save the changes to all 4 files.

  7. In the config repository you will need to add the configuration for sending messages to the event hub. Replace the contents of the current application.yml file with the contents of the 0602_application.yml file. Make sure you fill out your current MySQL server name on line 12. This file includes the following changes:

    • It configures the output stream for supply-out-0 to use the telemetry endpoint of the event hub on line 16.
    • It indicates the namespace you want to connect to on line 25. Make sure to provide here the name of your event hubs namespace.
    • It adds some values for polling on lines 26 to 28.

    Notice that this extra configuration does not include any mention to connection strings, passwords or tokens. The connection will happen based on the user assigned managed identity. Getting the token from AAD is all provided by the spring-cloud-azure-stream-binder-eventhubs library.

  8. Commit these changes to the config repo.

    git add .
    git commit -m 'added event hub supply'
    git push   
    
  9. Rebuild the customers service.

    mvn clean package -DskipTests -rf :spring-petclinic-customers-service
    
  10. Rebuild the container image and push it to the container registry.

    cd staging-acr
    cp ../spring-petclinic-customers-service/target/spring-petclinic-customers-service-$VERSION.jar spring-petclinic-customers-service-$VERSION.jar
       
    docker build -t $MYACR.azurecr.io/spring-petclinic-customers-service:$VERSION \
        --build-arg ARTIFACT_NAME=spring-petclinic-customers-service-$VERSION.jar \
        --build-arg APP_PORT=8080 \
        --build-arg AI_JAR=ai.jar \
        .
          
    docker push $MYACR.azurecr.io/spring-petclinic-customers-service:$VERSION
    
  11. Delete the current running customers service pod and double check that it starts running again.

    kubectl get pods
    kubectl delete pod <customers-service-pod>
       
    kubectl get pods -w
       
    kubectl logs <customers-service-pod> -f
    

    You should see in the logs messages related to the connection to the event hub.

    In case you see errors during the startup of the customers service, double check the error message whether it provides info on why it’s not working as expected. Double check all the above steps. To redeploy the customers service: rebuild the jar file and copy it to the staging-acr directory, rebuild the container image and stop the previously running pod of the customers-service. Additionally you may also delete the previous version of the config-server, before deleting the customers-service pod, so you are sure you are running with latest config. Also make sure the config-server starts running properly again as well.

  12. Once you see the customers service properly running again without errors, you can start testing out sending some test events. Navigate in the running petclinic application to Owners - Register. In the new owner screen fill out the form and select Submit.

  13. In the customers service log output, double check that you see messages indicating that events are being send to the event hub.

  14. In the Azure portal, navigate to your resource group and select the event hub. On the overview screen graphs you should see a peak in requests and messages.

    It might be that this peak in the graph is not immediately visible, wait for a minute and refresh the page.