-
Notifications
You must be signed in to change notification settings - Fork 632
Description
I'd like to be able to intercept outgoing messages per-binding and attach headers based on the payload. I know this can be done explicitly by operating on Message types in the function arguments and adding headers manually but I'd like to be able to write a library (such as a Spring Boot starter, etc.) that will intercept outgoing messages and add headers based on the payload. Such a library could be included in various projects throughout an organization to automatically add common or standard headers. This could be something like the server name or IP address or could be based on the specific type of the payload if various projects share a common payload type.
Requirements
- Should be able to access the payload (i.e., before content-type conversion) and any existing headers (including any that may have been copied from the input).
- Should be able to determine binding name and access binding properties
- Should work with imperative and reactive functions including multiple outputs (via Reactor tuples) and batching (collection-type input or output)
- Should work with
StreamBridge
andRoutingFunction
Approaches I've considered
- Declare a MessageConverter bean that is applied before any other converters that adds the necessary headers given the payload and then delegates to the other converters for actual payload content-type conversion. This approach is nice that it has access to the payload, works with the various function types, and works with StreamBridge. But the issue is a MessageConverter has no knowledge of the binding name. It's also a little tricky ensure the MessageConverter is applied first and then delegates to other converters but also avoid stack overflow from calling itself again.
- Use a
BeanPostProcessor
that detects Function (or Supplier) beans and provides replacements composing them with functionality to convert to a Message (if not one already) and add the desired headers. However, extra code is needed to determine the binding name and if functional composition is used, the processor would want to only apply to the "last" function and it should only work on function beans that are actual bindings in Spring Cloud Stream (via declared inspring.cloud.function.definition
). This can be done but requires some brittle code that duplicates code that is already in SCS like generating the binding name. The processor also has to handle both reactive and imperative and multiple output functions and doesn't work withStreamBridge
. Finally, if the function bean is not just a pure function via a lambda or anonymous type but a concrete type that also implements other interfaces that are required by the application, then just replacing the bean withFunction
orSupplier
won't work and will generate an error if the bean is attempted to be autowired by another supertype or superinterface. - Use Spring Cloud Function output enrichment functionality described here via an
output-header-mapping-expression
SpEL expression. But this has similar challenges as theBeanPostProcessor
approach as we'd need to identify which functions are Spring Cloud Stream bindings and the last function in the case of using function composition. Also, this property has to be configured for each function by name making it challenging to provide a global hook for all functions via a library. Also, the logic for adding a header might be more complex than what can be achieved in a SpEL expression. Finally, this enrichment is accomplished via functional composition under the hood and currently functional composition does not work for functions with multiple outputs. For this same reason, just using functional composition itself would not work. - Override the FunctionRegistry bean with one that allows decoration of functions in the
register(FunctionRegistration<T>)
method by decorating and replacing the function target. This approach is similar to theBeanPostProcessor
approach mentioned above except it works better with function beans that are also subtypes of other interfaces since we are decorating the target in theFunctionRegistration
instead of the function bean itself. This approach also works withStreamBridge
since it uses a pass thru function under the hood to leverage the same message conversion facilities used by normal function beans. However, this approach still requires some extra code and logic to be able to determine the "last" function in a Spring Cloud Stream composed function (i.e.,functionA|functionB
) and the output binding name and it has to explicitly handle the various function types (imperative, reactive, multiple input/output). I configured aBeanPostProcessor
to detectBindableFunctionProxyFactory
beans and use the mapping inBindableFunctionProxyFactory.getChannelNameToFunctions()
to determine which functions are the Spring Cloud Stream outgoing functions and their binding names. This works and is the approach I'm currently using but requires a fair amount of code to piece everything together.
Potential solutions
There could be many possible solutions but I think a good solution would be to update Spring Cloud Function input/output enrichment functionality to allow enriching headers via beans implementing some strategy interface that also works with multiple inputs/outputs. An example interface could be:
public interface MessageInterceptor {
default Message<?> incoming(final Message<?> message, final FunctionChannelMetadata metadata) {
return message;
}
default Message<?> outgoing(final Message<?> message, final FunctionChannelMetadata metadata) {
return message;
}
}
Implementations could selectively implement the incoming or outgoing methods (or both) and return a different Message
instance, for example with added headers or even replace the message entirely. The FunctionChannelMetadata would be a container of various metadata about the particular function and input/output index, etc.
record FunctionChannelMetadata(String functionName, String functionDefinition, int index) {}
There could be multiple implementations of this in the application and the standard bean ordering and prioritization mechanisms can be used (i.e., @Ordered
, etc.) to determine the sequence these interceptors would be applied. Any beans registered in the ApplicationContext
would be globally applied to all functions in the registry. However, in addition to this, interceptors could be added programmatically to a specific function (or composed function) by adding an addInterceptor(MessageInterceptor)
method to the FunctionInvocationWrapper
returned from the FunctionRegistry
that adds the interceptor to the end of the list of interceptors (it might be good to also add a getter getInterceptors()
method to get a read-only list of the interceptors being applied).
These methods would be called after/before payload type conversion for incoming/outgoing messages and should work for imperative and reactive functions and functions with multiple inputs/outputs. The last piece is the most challenging because currently message enrichment is done via function composition but currently, functional composition is not supported for functions with multiple inputs/outputs. So this would mean either support needs to be added for that or a different approach will need to be used. The FunctionInvocationWrapper
class does have an enhancer
that is applied on the output before type conversion. But this enhancer is labeled as internal use only and only works on the output and is applied at the function level instead of at the message level. I think what we need is something like the current enhancer functionality but applied at the message level almost like how MessageConverters
are applied but is distinct and separate mechanism. This MessageInterceptor
is applied to the message and not the function so clients don't need to handle all the different function types (imperative, reactive, etc.). I actually think MessageConverters
couple two distinct concepts: creation of a Message<T>
with headers, etc., and conversion of the payload via a content-type. I'd argue these probably need to be in different separate steps with different interfaces. This new MessageInterceptor
abstraction is for tweaking the Message
in between each function while a MessageConverter
is for converting the payload at the incoming and outgoing function.
I realize this change really would be implemented in Spring Cloud Function (SCF) and not Spring Cloud Stream (SCS) so I considered posting an issue there but my main use case for this is inside Spring Cloud Stream to add headers to outgoing messages (BEFORE type conversion) and only want to intercept functions that are SCS producer bindings and not all functions beans in the application or all those that might be registered in the SCF FunctionRegistry
. Also, I have a use case where I'd also like to know the binding name and access the properties for the binding (which I can do by injecting the BindingServiceProperties
and getting the properties for a binding based on the binding name). So I think this feature would require updates to SCF and SCS. Once SCF is updated with this type of MessageInterceptor
support, then SCS would just need an update to add its own MessageInterceptor
to the incoming and outgoing messages into the function definitions configured for SCS (and StreamBridge
) and add a framework-level header with the binding name. Then, other MessageInterceptors
injected by the application could use this header to determine if the function is a SCS function and get the binding name if it's needed (to access properties, etc.). It would be important that this SCS MessageInterceptor
is given priority and executed before other intercepters. SCF would naturally apply all MessageInterceptor
beans before returning the FunctionInvocationWrapper
to SCS so maybe to allow SCS to add it's MessageInterceptor
to be run first, the FunctionInvocationWrapper
needs separate addFirstInterceptor
and addLastInterceptor
methods to add to the beginning or end of the list. Then it could make sure the binding name header is added before other interceptors are applied.