diff --git a/src/main/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessor.java index 474cc6f3..be720e25 100644 --- a/src/main/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessor.java @@ -18,6 +18,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * A post processor which uses the channel property *archive* to add pv's to the archiver appliance The archive @@ -48,6 +49,8 @@ public class AAChannelProcessor implements ChannelProcessor { private String archivePropertyName; @Value("${aa.archiver_property_name:archiver}") private String archiverPropertyName; + @Value("${aa.archive_extra_fields_property_name:archive_extra_fields}") + private String archiveExtraFieldsPropertyName; @Value("${aa.auto_pause:}") private List autoPauseOptions; @@ -63,7 +66,8 @@ public String processorInfo() { Map processorProperties = Map.of("archiveProperty", archivePropertyName, "archiverProperty", archiverPropertyName, "Archivers", aaURLs.keySet().toString(), - "AutoPauseOn", autoPauseOptions.toString() + "AutoPauseOn", autoPauseOptions.toString(), + "archiveExtraFieldsProperty", archiveExtraFieldsPropertyName ); return "AAChannelProcessor: ProcessProperties " + processorProperties; } @@ -104,31 +108,21 @@ public long process(List channels) throws JsonProcessingException { channel.getProperties().stream() .filter(xmlProperty -> archiverPropertyName.equalsIgnoreCase(xmlProperty.getName())) .findFirst() - .map(xmlProperty -> { - String archiverValue = xmlProperty.getValue(); - // archiver property can be comma separated list of archivers - if (archiverValue != null && !archiverValue.isEmpty()) { - return Arrays.stream(archiverValue.split(",")) - .map(String::trim) - .filter(s -> !s.isEmpty()); - } else { - return defaultArchivers.stream(); - } - }) + .map(this::getArchiversStrings) .orElse(defaultArchivers.stream()) // Use defaultArchivers list if no matching property found - .forEach(archiverAlias -> { - try { - addChannelChange(channel, archiverAliasToArchivePVOptions, archiversInfo, archiveProperty, archiverAlias); - } catch (Exception e) { - logger.log(Level.WARNING, String.format("Failed to process %s", channel), e); - } - }); + .forEach(archiverAlias -> tryAddChannelChange(channel, archiverAlias, archiverAliasToArchivePVOptions, archiversInfo, archiveProperty)); } else if (autoPauseOptions.contains(archivePropertyName)) { aaURLs.keySet().forEach(archiverAlias -> archiverAliasToArchivePVOptions .get(archiverAlias) - .add(createArchivePV(List.of(), channel, "", PV_STATUS_INACTIVE))); + .add(createArchivePV(List.of(), channel.getName(), "", PV_STATUS_INACTIVE))); } }); + long finalCount = processArchiversAndOptions(archiverAliasToArchivePVOptions, archiversInfo); + logger.log(Level.INFO, () -> String.format("Configured %s channels.", finalCount)); + return finalCount; + } + + private long processArchiversAndOptions(Map> archiverAliasToArchivePVOptions, Map archiversInfo) throws JsonProcessingException { long count = 0; for (Map.Entry> e : archiverAliasToArchivePVOptions.entrySet()) { ArchiverInfo archiverInfo = archiversInfo.get(e.getKey()); @@ -142,9 +136,27 @@ public long process(List channels) throws JsonProcessingException { getArchiveActions(archivePVSList, archiverInfo); count += archiverClient.configureAA(archiveActionArchivePVMap, archiverInfo.url()); } - long finalCount = count; - logger.log(Level.INFO, () -> String.format("Configured %s channels.", finalCount)); - return finalCount; + return count; + } + + private void tryAddChannelChange(Channel channel, String archiverAlias, Map> archiverAliasToArchivePVOptions, Map archiversInfo, Optional archiveProperty) { + try { + addChannelChange(channel, archiverAliasToArchivePVOptions, archiversInfo, archiveProperty, archiverAlias); + } catch (Exception e) { + logger.log(Level.WARNING, String.format("Failed to process %s", channel), e); + } + } + + private Stream getArchiversStrings(Property xmlProperty) { + String archiverValue = xmlProperty.getValue(); + // archiver property can be comma separated list of archivers + if (archiverValue != null && !archiverValue.isEmpty()) { + return Arrays.stream(archiverValue.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()); + } else { + return defaultArchivers.stream(); + } } private void addChannelChange(Channel channel, @@ -157,16 +169,34 @@ private void addChannelChange(Channel channel, .findFirst() .map(Property::getValue) .orElse(PV_STATUS_INACTIVE); + String pvStatusAutoPause = autoPauseOptions.contains(PV_STATUS_PROPERTY_NAME) ? pvStatus : PV_STATUS_ACTIVE; + List extraFields = channel.getProperties().stream() + .filter(property -> archiveExtraFieldsPropertyName.equalsIgnoreCase(property.getName())) + .findFirst() + .map(Property::getValue) + .map(AAChannelProcessor::parseExtraFieldsProperty) + .orElse(List.of()); if (aaArchivePVS.containsKey(archiverAlias) && archiveProperty.isPresent()) { ArchivePVOptions newArchiverPV = createArchivePV( archiversInfo.get(archiverAlias).policies(), - channel, + channel.getName(), archiveProperty.get().getValue(), - autoPauseOptions.contains(PV_STATUS_PROPERTY_NAME) ? pvStatus : PV_STATUS_ACTIVE); + pvStatusAutoPause); + List extraFieldsPVs = extraFields.stream().map(f -> createArchivePV( + archiversInfo.get(archiverAlias).policies(), + String.format("%s.%s", channel.getName(), f), + archiveProperty.get().getValue(), + pvStatusAutoPause) + ).toList(); aaArchivePVS.get(archiverAlias).add(newArchiverPV); + aaArchivePVS.get(archiverAlias).addAll(extraFieldsPVs); } } + public static List parseExtraFieldsProperty(String s) { + return Arrays.stream(s.split(" ")).toList(); + } + private ArchiveAction pickArchiveAction(String archiveStatus, String pvStatus) { if (archiveStatus.equals("Being archived") && (pvStatus.equals(PV_STATUS_INACTIVE))) { return ArchiveAction.PAUSE; @@ -216,12 +246,12 @@ private Map> getArchiveActions( } private ArchivePVOptions createArchivePV( - List policyList, Channel channel, String archiveProperty, String pvStaus) { + List policyList, String channelName, String archiveProperty, String pvStaus) { ArchivePVOptions newArchiverPV = new ArchivePVOptions(); - if (aaPVA && !channel.getName().contains("://")) { - newArchiverPV.setPv("pva://" + channel.getName()); + if (aaPVA && !channelName.contains("://")) { + newArchiverPV.setPv("pva://" + channelName); } else { - newArchiverPV.setPv(channel.getName()); + newArchiverPV.setPv(channelName); } newArchiverPV.setSamplingParameters(archiveProperty, policyList); newArchiverPV.setPvStatus(pvStaus); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 6b891fb1..4ec38022 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -122,6 +122,7 @@ aa.enabled=true aa.pva=false aa.archive_property_name=archive aa.archiver_property_name=archiver +aa.archive_extra_fields_property_name=archive_extra_fields # Set the auto pause behaviour # diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorExtraFieldsIT.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorExtraFieldsIT.java new file mode 100644 index 00000000..44189da4 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorExtraFieldsIT.java @@ -0,0 +1,95 @@ +package org.phoebus.channelfinder.processors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.processors.aa.AAChannelProcessor; +import org.phoebus.channelfinder.processors.aa.ArchiveAction; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; +import org.springframework.test.context.TestPropertySource; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.activeProperty; +import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.archiveProperty; +import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.inactiveProperty; +import static org.phoebus.channelfinder.processors.AAChannelProcessorIT.paramableAAChannelProcessorTest; +import static org.phoebus.channelfinder.processors.AAChannelProcessorMultiIT.NOT_BEING_ARCHIVED; +import static org.phoebus.channelfinder.processors.AAChannelProcessorMultiIT.OWNER; + +@WebMvcTest(AAChannelProcessor.class) +@TestPropertySource(locations = "classpath:application_test.properties") +class AAChannelProcessorExtraFieldsIT { + protected static Property extraFieldsProperty = new Property("archive_extra_fields", "owner", "a b c"); + @Autowired + AAChannelProcessor aaChannelProcessor; + + MockWebServer mockArchiverAppliance; + ObjectMapper objectMapper; + + @BeforeEach + void setUp() throws IOException { + mockArchiverAppliance = new MockWebServer(); + mockArchiverAppliance.start(17665); + + objectMapper = new ObjectMapper(); + } + + @AfterEach + void teardown() throws IOException { + mockArchiverAppliance.shutdown(); + } + + static Stream provideArguments() { + List channels = List.of( + new Channel("PVNoneActive", OWNER, List.of(archiveProperty, activeProperty, extraFieldsProperty), List.of()) + ); + + Map namesToStatuses = Map.of( + "PVNoneActive", NOT_BEING_ARCHIVED, + "PVNoneActive.a", NOT_BEING_ARCHIVED, + "PVNoneActive.b", NOT_BEING_ARCHIVED, + "PVNoneActive.c", NOT_BEING_ARCHIVED + ); + Map> actionsToNames = Map.of( + ArchiveAction.ARCHIVE, List.of("PVNoneActive", "PVNoneActive.a", "PVNoneActive.b", "PVNoneActive.c") + ); + int expectedProcessedChannels = 4; + + return Stream.of( + Arguments.of(channels, namesToStatuses, actionsToNames, expectedProcessedChannels)); + + } + + @ParameterizedTest + @MethodSource("provideArguments") + void testProcessNotArchivedActive( + List channels, + Map namesToStatuses, + Map> actionsToNames, + int expectedProcessedChannels) + throws JsonProcessingException, InterruptedException { + AAChannelProcessorMultiIT.paramableMultiAAChannelProcessorTest( + mockArchiverAppliance, + objectMapper, + aaChannelProcessor, + channels, + namesToStatuses, + actionsToNames, + expectedProcessedChannels + ); + } +} diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorMultiIT.java index 32ae1387..cbbedda2 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorMultiIT.java @@ -111,6 +111,20 @@ void testProcessMulti(List channels, Map> actionsToNames, int expectedProcessedChannels) throws JsonProcessingException, InterruptedException { + paramableMultiAAChannelProcessorTest(mockArchiverAppliance, objectMapper, aaChannelProcessor, + channels, namesToStatuses, actionsToNames, expectedProcessedChannels + ); + } + + static void paramableMultiAAChannelProcessorTest( + MockWebServer mockArchiverAppliance, + ObjectMapper objectMapper, + AAChannelProcessor aaChannelProcessor, + List channels, + Map namesToStatuses, + Map> actionsToNames, + int expectedProcessedChannels) + throws JsonProcessingException, InterruptedException { // Request to version Map versions = Map.of("mgmt_version", "Archiver Appliance Version 1.1.0"); @@ -194,7 +208,7 @@ void testProcessMulti(List channels, } - public ArchiveAction actionFromEndpoint(final String endpoint) { + public static ArchiveAction actionFromEndpoint(final String endpoint) { for (ArchiveAction action : ArchiveAction.values()) { if (action.getEndpoint().equals(endpoint)) { return action; diff --git a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorTest.java b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorTest.java index 5403c477..b2b41ab2 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorTest.java +++ b/src/test/java/org/phoebus/channelfinder/processors/AAChannelProcessorTest.java @@ -13,19 +13,28 @@ import java.util.List; import java.util.stream.Stream; +import org.phoebus.channelfinder.processors.aa.AAChannelProcessor; import org.phoebus.channelfinder.processors.aa.ArchivePVOptions; class AAChannelProcessorTest { + public static final String SIM_TESTING_POLICY = "sim://testingPolicy"; + public static final String POLICY_FAST = "Fast"; + public static final String POLICY_SLOW_CONTROLLED = "SlowControlled"; + public static final String SAMPLING_METHOD_SCAN = "SCAN"; + public static final String POLICY_FAST_CONTROLLED = "FastControlled"; + public static final String POLICY_SLOW = "Slow"; + public static final String SAMPLING_METHOD_MONITOR = "MONITOR"; + @Test void archivePropertyParsePass() { ArchivePVOptions ar = new ArchivePVOptions(); - List testPolicyList = Arrays.asList("Fast", "FastControlled", "Slow", "SlowControlled"); + List testPolicyList = Arrays.asList(POLICY_FAST, POLICY_FAST_CONTROLLED, POLICY_SLOW, POLICY_SLOW_CONTROLLED); ar.setPv("sim://testing"); ar.setSamplingParameters("monitor@1.0", testPolicyList); Assertions.assertEquals(ar.getPv(), "sim://testing"); - Assertions.assertEquals(ar.getSamplingMethod(), "MONITOR"); + Assertions.assertEquals(ar.getSamplingMethod(), SAMPLING_METHOD_MONITOR); Assertions.assertEquals(ar.getSamplingPeriod(), "1.0"); } @@ -41,7 +50,7 @@ void testArchiveProperty(String parameters, List policyList, } private static Stream provideArchivePropertyArguments() { - List testPolicyList = Arrays.asList("Fast", "FastControlled", "Slow", "SlowControlled"); + List testPolicyList = Arrays.asList(POLICY_FAST, POLICY_FAST_CONTROLLED, POLICY_SLOW, POLICY_SLOW_CONTROLLED); return Stream.of( // Failure Arguments.of("@blah", testPolicyList, null, null), @@ -55,21 +64,21 @@ private static Stream provideArchivePropertyArguments() { Arguments.of("INVALID", testPolicyList, null, null), Arguments.of("MMMONITOR@10.0", testPolicyList, null, null), // Success - Arguments.of("MONITOR@0.01 ignore me", testPolicyList, "MONITOR", "0.01"), - Arguments.of("MONITOR@1 ignore me", testPolicyList, "MONITOR", "1"), - Arguments.of("MONITOR@0.01 ignore me", testPolicyList, "MONITOR", "0.01"), - Arguments.of("ScAn@10.01000", testPolicyList, "SCAN", "10.01000"), - Arguments.of("MONITOR@0.01", testPolicyList, "MONITOR", "0.01"), - Arguments.of("MONITOR@1", testPolicyList, "MONITOR", "1"), - Arguments.of("scan@.01", testPolicyList, "SCAN", ".01"), - Arguments.of("scan@1.01", testPolicyList, "SCAN", "1.01") + Arguments.of("MONITOR@0.01 ignore me", testPolicyList, SAMPLING_METHOD_MONITOR, "0.01"), + Arguments.of("MONITOR@1 ignore me", testPolicyList, SAMPLING_METHOD_MONITOR, "1"), + Arguments.of("MONITOR@0.01 ignore me", testPolicyList, SAMPLING_METHOD_MONITOR, "0.01"), + Arguments.of("ScAn@10.01000", testPolicyList, SAMPLING_METHOD_SCAN, "10.01000"), + Arguments.of("MONITOR@0.01", testPolicyList, SAMPLING_METHOD_MONITOR, "0.01"), + Arguments.of("MONITOR@1", testPolicyList, SAMPLING_METHOD_MONITOR, "1"), + Arguments.of("scan@.01", testPolicyList, SAMPLING_METHOD_SCAN, ".01"), + Arguments.of("scan@1.01", testPolicyList, SAMPLING_METHOD_SCAN, "1.01") ); } @Test void defaultArchiveTag() { ArchivePVOptions ar = new ArchivePVOptions(); - List testPolicyList = Arrays.asList("Fast", "FastControlled", "Slow", "SlowControlled"); + List testPolicyList = Arrays.asList(POLICY_FAST, POLICY_FAST_CONTROLLED, POLICY_SLOW, POLICY_SLOW_CONTROLLED); ar.setPv("sim://testing"); ar.setSamplingParameters("default", testPolicyList); @@ -82,23 +91,27 @@ void defaultArchiveTag() { @Test void archivePolicyParsing() { ArchivePVOptions ar = new ArchivePVOptions(); - ar.setPv("sim://testingPolicy"); - ar.setPolicy("Fast"); + ar.setPv(SIM_TESTING_POLICY); + ar.setPolicy(POLICY_FAST); - Assertions.assertEquals(ar.getPv(), "sim://testingPolicy"); + Assertions.assertEquals(ar.getPv(), SIM_TESTING_POLICY); Assertions.assertNull(ar.getSamplingMethod()); Assertions.assertNull(ar.getSamplingPeriod()); - Assertions.assertEquals(ar.getPolicy(), "Fast"); + Assertions.assertEquals(ar.getPolicy(), POLICY_FAST); - ar.setPolicy("SlowControlled"); + ar.setPolicy(POLICY_SLOW_CONTROLLED); Assertions.assertNull(ar.getSamplingMethod()); Assertions.assertNull(ar.getSamplingPeriod()); - Assertions.assertEquals(ar.getPolicy(), "SlowControlled"); + Assertions.assertEquals(ar.getPolicy(), POLICY_SLOW_CONTROLLED); ar.setSamplingParameters("scan@60", new ArrayList<>()); - Assertions.assertEquals(ar.getSamplingMethod(), "SCAN"); + Assertions.assertEquals(ar.getSamplingMethod(), SAMPLING_METHOD_SCAN); Assertions.assertEquals(ar.getSamplingPeriod(), "60"); - Assertions.assertEquals(ar.getPolicy(), "SlowControlled"); + Assertions.assertEquals(ar.getPolicy(), POLICY_SLOW_CONTROLLED); + } + @Test + void archiveExtraFieldParsing() { + Assertions.assertEquals(List.of("a", "b", "c"), AAChannelProcessor.parseExtraFieldsProperty("a b c")); } @Test @@ -126,12 +139,12 @@ void archivePVJson() throws JsonProcessingException { Assertions.assertEquals(str, expectedString); // Test policies - List testPolicyList = Arrays.asList("Fast", "FastControlled", "Slow", "SlowControlled"); + List testPolicyList = Arrays.asList(POLICY_FAST, POLICY_FAST_CONTROLLED, POLICY_SLOW, POLICY_SLOW_CONTROLLED); // Valid policy ArchivePVOptions ar4 = new ArchivePVOptions(); ar4.setPv("sim://testing4"); - ar4.setSamplingParameters("Fast", testPolicyList); + ar4.setSamplingParameters(POLICY_FAST, testPolicyList); str = objectMapper.writeValueAsString(List.of(ar4)); expectedString = "[{\"pv\":\"sim://testing4\",\"policy\":\"Fast\"}]";