Update an existing microservice to send events to the Event Hub
You will now implement the functionality that will allow you to emulate sending events to the telemetry Event Hub. For this you will update the customers-service
. Each time a new customer gets created, send a new event to the event hub, together with 100 new pet events (we are going to simulate some dummy load).
- Sending and Receiving Message by Azure Event Hubs and Spring Cloud Stream Binder Eventhubs in Spring Boot Application
- Spring Cloud Stream with Azure Event Hubs
Step by step guidance
-
In the
customers-service
pom.xml
file, add the following extra dependency:<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId> </dependency>
-
Replace the contents of the src/main/java/org/springframework/samples/petclinic/customers/CustomersServiceApplication.java file with:
package org.springframework.samples.petclinic.customers; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author Maciej Szarlinski */ @EnableDiscoveryClient @SpringBootApplication public class CustomersServiceApplication { private static final Logger LOGGER = LoggerFactory.getLogger(CustomersServiceApplication.class); public static void main(String[] args) { SpringApplication.run(CustomersServiceApplication.class, args); } @ServiceActivator(inputChannel = "telemetry.errors") public void producerError(Message<?> message) { LOGGER.error("Handling Producer ERROR: " + message); } }
This adds an extra logger and a method that will be called in case of errors in the sending of events.
-
Replace the contents of the src/spring-petclinic-customers-service/src/main/java/org/springframework/samples/petclinic/customers/web/OwnerResource.java file with:
package org.springframework.samples.petclinic.customers.web; import io.micrometer.core.annotation.Timed; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.samples.petclinic.customers.model.Owner; import org.springframework.samples.petclinic.customers.model.OwnerRepository; import org.springframework.web.bind.annotation.*; import jakarta.validation.Valid; import jakarta.validation.constraints.Min; import java.util.List; import java.util.Optional; import reactor.core.publisher.Sinks; import reactor.core.publisher.Flux; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import java.util.function.Supplier; import org.springframework.beans.factory.annotation.Autowired; /** * @author Juergen Hoeller * @author Ken Krebs * @author Arjen Poutsma * @author Michael Isvy * @author Maciej Szarlinski */ @RequestMapping("/owners") @RestController @Timed("petclinic.owner") @RequiredArgsConstructor @Slf4j class OwnerResource { private final OwnerRepository ownerRepository; @Autowired private Sinks.Many<Message<String>> many; private static final Logger LOGGER = LoggerFactory.getLogger(OwnerResource.class); /** * Create Owner */ @PostMapping @ResponseStatus(HttpStatus.CREATED) public Owner createOwner(@Valid @RequestBody Owner owner) { LOGGER.info("+++Sending events+++"); many.emitNext(MessageBuilder.withPayload("New owner created: " + owner.getFirstName() + " " + owner.getLastName() + " with many pets ...").build(), Sinks. EmitFailureHandler.FAIL_FAST); for(int i = 0; i < 100; i++) { many.emitNext(MessageBuilder.withPayload("Pet " + i).build(), Sinks.EmitFailureHandler.FAIL_FAST); } return ownerRepository.save(owner); } /** * Read single Owner */ @GetMapping(value = "/{ownerId}") public Optional<Owner> findOwner(@PathVariable("ownerId") @Min(1) int ownerId) { return ownerRepository.findById(ownerId); } /** * Read List of Owners */ @GetMapping public List<Owner> findAll() { return ownerRepository.findAll(); } /** * Update Owner */ @PutMapping(value = "/{ownerId}") @ResponseStatus(HttpStatus.NO_CONTENT) public void updateOwner(@PathVariable("ownerId") @Min(1) int ownerId, @Valid @RequestBody Owner ownerRequest) { final Optional<Owner> owner = ownerRepository.findById(ownerId); final Owner ownerModel = owner.orElseThrow(() -> new ResourceNotFoundException("Owner "+ownerId+" not found")); // This is done by hand for simplicity purpose. In a real life use-case we should consider using MapStruct. ownerModel.setFirstName(ownerRequest.getFirstName()); ownerModel.setLastName(ownerRequest.getLastName()); ownerModel.setCity(ownerRequest.getCity()); ownerModel.setAddress(ownerRequest.getAddress()); ownerModel.setTelephone(ownerRequest.getTelephone()); log.info("Saving owner {}", ownerModel); ownerRepository.save(ownerModel); } }
This adds an additional sync, that is used in the
createOwner
method to add events to. -
In the /src/spring-petclinic-customers-service/src/main/java/org/springframework/samples/petclinic/customers/config/ folder, add an extra
ManualProducerConfiguration.java
file with the below contents:package org.springframework.samples.petclinic.customers.config; import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders; import com.azure.spring.messaging.AzureHeaders; import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.messaging.Message; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; @Configuration public class ManualProducerConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(ManualProducerConfiguration.class); @Bean public Sinks.Many<Message<String>> many() { return Sinks.many().unicast().onBackpressureBuffer(); } @Bean public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) { return () -> many.asFlux() .doOnNext(m -> LOGGER.info("Manually sending message {}", m)) .doOnError(t -> LOGGER.error("Error encountered", t)); } }
The
ManualProducerConfiguration
uses the sync to send the events to the event hub. -
Update the /src/spring-petclinic-customers-service/src/main/resources/application.yml file, and replace its contents with:
spring: application: name: customers-service config: import: optional:configserver:${CONFIG_SERVER_URL:http://localhost:8888/} cloud: function: supply;
This adds an extra spring cloud function for the supply method in the
ManualProducerConfiguration
class. -
Save the changes to all 4 files.
-
In the config repository you will need to add the configuration for sending messages to the event hub. Replace the contents of the current
application.yml
file with the contents of the 0602_application.yml file. Make sure you fill out your current MySQL server name on line12
. This file includes the following changes:- It configures the output stream for
supply-out-0
to use the telemetry endpoint of the event hub on line16
. - It indicates the namespace you want to connect to on line
25
. Make sure to provide here the name of your event hubs namespace. - It adds some values for polling on lines
26
to28
.
Notice that this extra configuration does not include any mention to connection strings, passwords or tokens. The connection will happen based on the user assigned managed identity. Getting the token from AAD is all provided by the
spring-cloud-azure-stream-binder-eventhubs
library. - It configures the output stream for
-
Commit these changes to the config repo.
git add . git commit -m 'added event hub supply' git push
-
Rebuild the customers service.
mvn clean package -DskipTests -rf :spring-petclinic-customers-service
-
Rebuild the container image and push it to the container registry.
cd staging-acr cp ../spring-petclinic-customers-service/target/spring-petclinic-customers-service-$VERSION.jar spring-petclinic-customers-service-$VERSION.jar docker build -t $MYACR.azurecr.io/spring-petclinic-customers-service:$VERSION \ --build-arg ARTIFACT_NAME=spring-petclinic-customers-service-$VERSION.jar \ --build-arg APP_PORT=8080 \ --build-arg AI_JAR=ai.jar \ . docker push $MYACR.azurecr.io/spring-petclinic-customers-service:$VERSION
-
Delete the current running customers service pod and double check that it starts running again.
kubectl get pods kubectl delete pod <customers-service-pod> kubectl get pods -w kubectl logs <customers-service-pod> -f
You should see in the logs messages related to the connection to the event hub.
In case you see errors during the startup of the customers service, double check the error message whether it provides info on why it’s not working as expected. Double check all the above steps. To redeploy the customers service: rebuild the jar file and copy it to the staging-acr directory, rebuild the container image and stop the previously running pod of the customers-service. Additionally you may also delete the previous version of the config-server, before deleting the customers-service pod, so you are sure you are running with latest config. Also make sure the config-server starts running properly again as well.
-
Once you see the customers service properly running again without errors, you can start testing out sending some test events. Navigate in the running petclinic application to Owners - Register. In the new owner screen fill out the form and select Submit.
-
In the customers service log output, double check that you see messages indicating that events are being send to the event hub.
-
In the Azure portal, navigate to your resource group and select the event hub. On the overview screen graphs you should see a peak in requests and messages.
It might be that this peak in the graph is not immediately visible, wait for a minute and refresh the page.