001package com.pingidentity.sync.source; 002 003import com.pingidentity.util.*; 004import com.unboundid.directory.sdk.common.internal.Reconfigurable; 005import com.unboundid.directory.sdk.sync.api.SyncSource; 006import com.unboundid.directory.sdk.sync.config.SyncSourceConfig; 007import com.unboundid.directory.sdk.sync.types.*; 008import com.unboundid.ldap.sdk.ChangeType; 009import com.unboundid.ldap.sdk.DN; 010import com.unboundid.ldap.sdk.Entry; 011import com.unboundid.ldap.sdk.ResultCode; 012import com.unboundid.util.args.*; 013 014import java.io.File; 015import java.io.FileFilter; 016import java.io.IOException; 017import java.io.Serializable; 018import java.nio.file.Files; 019import java.nio.file.LinkOption; 020import java.nio.file.attribute.UserPrincipal; 021import java.util.*; 022import java.util.concurrent.atomic.AtomicLong; 023 024import static com.pingidentity.util.ARG.makePlaceholder; 025import static java.lang.Math.max; 026import static java.lang.Math.min; 027 028/** 029 * This class provides a simple mechanism to inject raw file contents and metadata in Sync pipes 030 */ 031public class RawFiles extends SyncSource implements Reconfigurable<SyncSourceConfig> { 032 private static final String SRC_DIR = "sources/raw"; 033 private static final String ARG_DEFAULT_SRC_DIR = SRC_DIR + "/pending"; 034 private static final String ARG_NAME_SRC_DIR = "src-dir"; 035 private static final String ARG_DEFAULT_DST_DIR = SRC_DIR + "/processed"; 036 private static final String ARG_NAME_DST_DIR = "processed-dir"; 037 private static final String ARG_NAME_KEEP_NO_FILES = "delete-processed-files"; 038 private static final String ARG_NAME_FILE_EXTENSION = "file-extension"; 039 private static final String ARG_NAME_FILE_PREFIX = "file-prefix"; 040 private static final String FILE_ATTACHMENT_KEY = "File.Attachment"; 041 private static final String ARG_NAME_RDN_ATTR = "rdn-attribute"; 042 private static final String RDN_ATTR_DEFAULT = "id"; 043 private static final String ARG_NAME_SORTING_CRITERIA = "sort-by"; 044 private static final String ARG_NAME_REVERSE_SORTING_ORDER = "reverse-sort-order"; 045 public static final String SORT_CRITERIA_MODIFIED = "last-modified"; 046 public static final String SORT_CRITERIA_CREATED = "created"; 047 public static final String SORT_CRITERIA_NAME = "file-name"; 048 public static final String SORT_CRITERIA_RAW = "raw-file"; 049 private FileFilter fileFilter; 050 private Comparator<File> orderingComparator; 051 private File sourceFilesDir; 052 private File processedFilesDir; 053 private Boolean deleteProcessedFiles = true; 054 private SyncServerContext serverContext; 055 private String rdnAttr; 056 private String startpoint = ""; 057 String url = "ldap://raw?initialized=false"; 058 059 /** 060 * @return the extension name 061 */ 062 @Override 063 public String getExtensionName() { 064 return "Raw Files Sync Source"; 065 } 066 067 /** 068 * @return a array of String providing description of the extension 069 */ 070 @Override 071 public String[] getExtensionDescription() { 072 return new String[]{"A sync source to synchronize raw files contents to a destination", 073 "Files are monitored from a local directory and processed as they are detected. " + 074 "It is possible to choose ordering in which files will be added to the sync pipe queue " + 075 "and add a filter to select files with a specific prefix or extension."}; 076 } 077 078 /** 079 * Performs the necessary processing to define the configuration arguments for the extension 080 * @param parser the argument parser 081 * @throws ArgumentException if any argument declaration is incorrect 082 */ 083 @Override 084 public void defineConfigArguments(ArgumentParser parser) throws ArgumentException { 085 List<File> defaultSourceFile = new ArrayList<>(); 086 defaultSourceFile.add(new File(ARG_DEFAULT_SRC_DIR)); 087 parser.addArgument(new FileArgument(ARG.NO_SHORTCUT, ARG_NAME_SRC_DIR, ARG.OPTIONAL, ARG.UNIQUE, "{path}", 088 "The file system directory to monitor for raw files", ARG.FILE_MUST_EXIST, ARG.PARENT_MUST_EXIST, ARG 089 .MAY_NOT_BE_FILE, ARG.MUST_BE_DIRECTORY, defaultSourceFile)); 090 091 List<File> defaultDestinationFile = new ArrayList<>(); 092 defaultDestinationFile.add(new File(ARG_DEFAULT_DST_DIR)); 093 parser.addArgument(new FileArgument(ARG.NO_SHORTCUT, ARG_NAME_DST_DIR, ARG.OPTIONAL, ARG.UNIQUE, "{path}", 094 "The file system directory where the extension should " 095 + "move CSV files after it finishes processing them", ARG.FILE_MUST_EXIST, ARG 096 .PARENT_MUST_EXIST, ARG.MAY_NOT_BE_FILE, ARG.MUST_BE_DIRECTORY, defaultDestinationFile)); 097 098 parser.addArgument(new BooleanArgument(ARG.NO_SHORTCUT, ARG_NAME_KEEP_NO_FILES, 099 "Whether to delete processed files instead of moving them to the processed directory")); 100 101 parser.addArgument(new StringArgument(ARG.NO_SHORTCUT, ARG_NAME_FILE_EXTENSION, ARG.OPTIONAL, ARG.UNIQUE, "{" + ARG_NAME_FILE_EXTENSION + "}", "File extension to include for synchronization")); 102 parser.addArgument(new StringArgument(ARG.NO_SHORTCUT, ARG_NAME_FILE_PREFIX, ARG.OPTIONAL, ARG.UNIQUE, "{" + ARG_NAME_FILE_PREFIX + "}", "File prefix to include for synchronization")); 103 104 StringArgument rdnAttrArg = new StringArgument(ARG.NO_SHORTCUT, ARG_NAME_RDN_ATTR, ARG.OPTIONAL, ARG.UNIQUE, makePlaceholder(ARG_NAME_RDN_ATTR), "Attribute name to use for RDN (Default: " + RDN_ATTR_DEFAULT + ")", RDN_ATTR_DEFAULT); 105 parser.addArgument(rdnAttrArg); 106 107 Set<String> allowedValues = new HashSet<>(); 108 allowedValues.add(SORT_CRITERIA_MODIFIED); 109 allowedValues.add(SORT_CRITERIA_CREATED); 110 allowedValues.add(SORT_CRITERIA_NAME); 111 allowedValues.add(SORT_CRITERIA_RAW); 112 List<String> defaultValues = new ArrayList<>(1); 113 defaultValues.add(SORT_CRITERIA_MODIFIED); 114 parser.addArgument(new StringArgument(ARG.NO_SHORTCUT, ARG_NAME_SORTING_CRITERIA, ARG.OPTIONAL, ARG.UNIQUE, makePlaceholder(ARG_NAME_SORTING_CRITERIA), "The order in which files in the source directory must be ordered for processing", allowedValues, defaultValues)); 115 116 parser.addArgument(new BooleanArgument(ARG.NO_SHORTCUT, ARG_NAME_REVERSE_SORTING_ORDER, "Reverse the sorting order")); 117 } 118 119 /** 120 * Performs the necessary processing to validate the validity of the proposed configuration 121 * @param syncSourceConfig the configuration object to use to verify validity 122 * @param argumentParser the argument parser 123 * @param unacceptableReasons a list of String describing the reasons for which the proposed configuration is not acceptable 124 * @return <code>true</code> is the configuration is found to be acceptable, <code>false</code> otherwise 125 */ 126 @Override 127 public boolean isConfigurationAcceptable(SyncSourceConfig syncSourceConfig, ArgumentParser argumentParser, List<String> unacceptableReasons) { 128 return true; 129 } 130 131 /** 132 * Performs the necessary processing to apply the proposed configuration, either at run time or with a restart of the 133 * SyncSource or the server instance. 134 * @param syncSourceConfig the configuration object to use 135 * @param parser the argument parser from whence to pull the configuration argument values 136 * @param adminActionsRequired a list of String with administrative operations required to complete applying the configuration 137 * @param messages a list of String with message about the processing of applying the configuration 138 * @return <code>ResultCode.SUCCESS</code> 139 */ 140 @Override 141 public ResultCode applyConfiguration(SyncSourceConfig syncSourceConfig, ArgumentParser parser, 142 List<String> adminActionsRequired, List<String> messages) { 143 rdnAttr = parser.getStringArgument(ARG_NAME_RDN_ATTR).getValue(); 144 String fileExtension = parser.getStringArgument(ARG_NAME_FILE_EXTENSION).getValue(); 145 String filePrefix = parser.getStringArgument(ARG_NAME_FILE_PREFIX).getValue(); 146 147 fileFilter = new SourceFileFilter(fileExtension, filePrefix); 148 149 boolean isReversed = parser.getBooleanArgument(ARG_NAME_REVERSE_SORTING_ORDER).isPresent(); 150 String sortingCriteria = parser.getStringArgument(ARG_NAME_SORTING_CRITERIA).getValue(); 151 switch(sortingCriteria){ 152 case SORT_CRITERIA_RAW: 153 orderingComparator = new FileComparator(isReversed); 154 case SORT_CRITERIA_NAME: 155 orderingComparator = new NameComparator(isReversed); 156 case SORT_CRITERIA_CREATED: 157 orderingComparator = new CreateComparator(isReversed); 158 break; 159 default: 160 orderingComparator = new LastModComparator(isReversed); 161 } 162 163 sourceFilesDir = parser.getFileArgument(ARG_NAME_SRC_DIR).getValue(); 164 if (!sourceFilesDir.isAbsolute()) { 165 sourceFilesDir = new File(serverContext.getServerRoot().getPath() + "/" + sourceFilesDir.getPath()); 166 } 167 processedFilesDir = parser.getFileArgument(ARG_NAME_DST_DIR).getValue(); 168 if (!processedFilesDir.isAbsolute()) { 169 processedFilesDir = new File(serverContext.getServerRoot().getPath() + "/" + processedFilesDir.getPath()); 170 } 171 deleteProcessedFiles = parser.getBooleanArgument(ARG_NAME_KEEP_NO_FILES).isPresent(); 172 url = computeUrl(fileExtension, filePrefix); 173 return ResultCode.SUCCESS; 174 } 175 176 /** 177 * Performs the necessary processing to initialize the <code>SyncSource</code> 178 * @param serverContext the server context of the instance of the SyncSource 179 * @param config the configuration object 180 * @param parser the argument parser 181 */ 182 @Override 183 public void initializeSyncSource(SyncServerContext serverContext, SyncSourceConfig config, ArgumentParser parser) { 184 this.serverContext = serverContext; 185 186 this.orderingComparator = new LastModComparator(parser.getBooleanArgument(ARG_NAME_REVERSE_SORTING_ORDER).isPresent()); 187 188 // Use the applyConfiguration method to set up the extension 189 final ArrayList<String> adminActionsRequired = new ArrayList<>(5); 190 final ArrayList<String> messages = new ArrayList<>(5); 191 final ResultCode resultCode = applyConfiguration(config, parser, adminActionsRequired, messages); 192 if (resultCode != ResultCode.SUCCESS) { 193 LOG.out("Error initializing " + config.getConfigObjectName(), this, config, serverContext); 194 for (String m : messages) { 195 LOG.out(m, this, config, serverContext); 196 } 197 } 198 } 199 200 private void addURLParam(StringBuilder sb, String name, String value, boolean append) { 201 if (append) sb.append("&"); 202 sb.append(name); 203 sb.append("="); 204 sb.append(value); 205 } 206 207 /** 208 * Performs the necessary processing to compute and return the URL for the instance of the <code>SyncSource</code> 209 * @return a String with a unique URL for the instance of the extension 210 */ 211 @Override 212 public String getCurrentEndpointURL() { 213 return url; 214 } 215 216 /** 217 * Performs the necessary processing to correctly set the start point of the Sync pipe the instance of the extension 218 * is associated with. 219 * @param setStartpointOptions the <code>SetStartpointOptions</code> to use, such as SetStartpointOptions.BEGINNING_OF_CHANGELOG 220 * @throws EndpointException if an error was encountered in the process of setting the startpoint 221 */ 222 @Override 223 public void setStartpoint(SetStartpointOptions setStartpointOptions) throws EndpointException { 224 // currently only support BEGINNING_OF_CHANGELOG (i.e. process all the files in the source directory) 225 switch (setStartpointOptions.getStartpointType()) { 226 case BEGINNING_OF_CHANGELOG: 227 startpoint = ""; 228 break; 229 case RESUME_AT_CSN: 230 startpoint = setStartpointOptions.getCSN(); 231 break; 232 case RESUME_AT_SERIALIZABLE: 233 Serializable serializableValue = setStartpointOptions.getSerializableValue(); 234 if (serializableValue == null) { 235 throw new IllegalArgumentException("Serializable value must be provided"); 236 } 237 if (!(serializableValue instanceof String)) { 238 throw new IllegalArgumentException("Serializable value must be a String"); 239 } 240 startpoint = (String) serializableValue; 241 break; 242 case END_OF_CHANGELOG: 243 File[] files = getCandidates(); 244 startpoint = files[files.length].getName(); 245 break; 246 case RESUME_AT_CHANGE_TIME: 247 throw new IllegalArgumentException( 248 "This start point type is not supported yet: " 249 + setStartpointOptions.getStartpointType().toString()); 250 default: 251 throw new IllegalArgumentException( 252 "This start point type is not supported: " 253 + setStartpointOptions.getStartpointType().toString()); 254 } 255 } 256 257 /** 258 * @return the startpoint serializanle information for the sync pipe the instance of the extension is associated with 259 */ 260 @Override 261 public Serializable getStartpoint() { 262 return startpoint; 263 } 264 265 266 /** 267 * Performs the necessary processing to retrieve the next batch of files to enqueue in the Sync pipe 268 * @param maxChanges the maximum number of files to enqueue in one call 269 * @param stillPending the remaining number of files still pending after one call has completed 270 * @return 271 * @throws EndpointException if an error was encountered while retrieving the next batch 272 */ 273 @Override 274 public List<ChangeRecord> getNextBatchOfChanges(int maxChanges, AtomicLong stillPending) throws EndpointException { 275 List<ChangeRecord> results = new ArrayList<>(); 276 File[] files = getCandidates(); 277 if (files != null) { 278 int i=0; 279 for (File f: files){ 280 if (i++>maxChanges) 281 break; 282 // bail fast 283 if (f == null) 284 continue; 285 String id = rdnAttr + "=" + f.getName(); 286 ChangeRecord.Builder changeRecordBuilder = new ChangeRecord.Builder(ChangeType.MODIFY, id); 287 changeRecordBuilder.addProperty(FILE_ATTACHMENT_KEY, f); 288 changeRecordBuilder.changeTime(f.lastModified()); 289 try { 290 UserPrincipal owner = Files.getOwner(f.toPath()); 291 changeRecordBuilder.modifier(owner.getName()); 292 } catch (IOException e) { 293 changeRecordBuilder.modifier("unavailable"); 294 } 295 results.add(changeRecordBuilder.build()); 296 } 297 // Set the number of files pending for processing 298 stillPending.set(max(results.size()-maxChanges,0)); 299 } 300 301 return results; 302 } 303 304 /** 305 * Performs the necessary processing to retrieve the file contents and create an entry 306 * @param syncOperation the sync operation to process 307 * @return the <code>Entry</code> with the file contents and metadata 308 * @throws EndpointException if an issue was encountered while processing the <code>SyncOperation</code> 309 */ 310 @Override 311 public Entry fetchEntry(SyncOperation syncOperation) throws EndpointException { 312 Entry entry = null; 313 if (syncOperation == null) 314 return entry; 315 316 ChangeRecord changeRecord = syncOperation.getChangeRecord(); 317 if (changeRecord == null) 318 return entry; 319 320 File f = (File) changeRecord.getProperty(FILE_ATTACHMENT_KEY); 321 if (f == null) 322 return entry; 323 324 DN identifiableInfo = changeRecord.getIdentifiableInfo(); 325 if (identifiableInfo == null) 326 return entry; 327 328 entry = new Entry(identifiableInfo); 329 330 try { 331 byte[] bytes = Files.readAllBytes(f.toPath()); 332 entry.addAttribute("contents", bytes); 333 } catch (IOException e) { 334 entry.addAttribute("contents-error", e.getMessage()); 335 } 336 337 try { 338 Object creationTime = Files.getAttribute(f.toPath(), "creationTime"); 339 if (creationTime != null) { 340 entry.addAttribute("creation-time", Files.getAttribute(f.toPath(), "creationTime").toString()); 341 } 342 } catch (IOException e) { 343 entry.addAttribute("creation-time-error",e.getMessage()); 344 } 345 346 try { 347 Object lastModifiedTime = Files.getAttribute(f.toPath(), "lastModifiedTime"); 348 if (lastModifiedTime != null) { 349 entry.addAttribute("last-modified-time", lastModifiedTime.toString()); 350 } 351 } catch (IOException e) { 352 entry.addAttribute("last-modified-time-error", e.getMessage()); 353 } 354 355 UserPrincipal userPrincipal = null; 356 try { 357 userPrincipal = Files.getOwner(f.toPath(), LinkOption.NOFOLLOW_LINKS); 358 if (userPrincipal != null) { 359 entry.addAttribute("owner", userPrincipal.getName()); 360 } 361 } catch (IOException e) { 362 entry.addAttribute("owner-error", e.getMessage()); 363 } 364 365 try { 366 entry.addAttribute("size", Long.toString(Files.size(f.toPath()))); 367 } catch (IOException e) { 368 entry.addAttribute("size-error", e.getMessage()); 369 } 370 entry.addAttribute("file-name", f.getName()); 371 entry.addAttribute("file-path", f.getPath()); 372 373 return entry; 374 } 375 376 /** 377 * Performs the necessary processing to deal with each completed <code>SyncOperation</code> after it has been 378 * successfully processed 379 * @param completedOperations a List of completed <code>SyncOperation</code> 380 * @throws EndpointException if an error is encountered in the process of acknowledging the completed operations 381 */ 382 @Override 383 public void acknowledgeCompletedOps(LinkedList<SyncOperation> completedOperations) throws EndpointException { 384 for (SyncOperation completedOperation : completedOperations) { 385 File f = (File) completedOperation.getChangeRecord().getProperty(FILE_ATTACHMENT_KEY); 386 startpoint = f.getName(); 387 try { 388 if (deleteProcessedFiles) { 389 deleteCompletedFile(f); 390 } else { 391 moveCompletedFile(f); 392 } 393 } catch (Exception e) { 394 throw new EndpointException(PostStepResult.ABORT_OPERATION, e); 395 } 396 } 397 } 398 399 /** 400 * Peforms the necessary processing to delete a file once an operation has completed 401 * @param srcFile the file to delete 402 * @throws Exception if an error is encountered while deleting the file 403 */ 404 private void deleteCompletedFile(final File srcFile) throws Exception { 405 if (srcFile != null) { 406 if (!srcFile.delete()) { 407 throw new Exception("Could not delete " + srcFile.getName()); 408 } 409 } 410 } 411 412 /** 413 * Performs the necessary processing to move a file once an operation has completed 414 * @param srcFile the file to move 415 * @throws Exception if an error is encountered while moving the file 416 */ 417 private void moveCompletedFile(final File srcFile) throws Exception { 418 final File destFile = new File(processedFilesDir, srcFile.getName()); 419 if (srcFile != null) { 420 if (!srcFile.renameTo(destFile)) { 421 throw new Exception("Could not rename " + srcFile.getName() + " to " + destFile.getName()); 422 } 423 } 424 } 425 426 /** 427 * Performs the necessary processing to retrieve candidate files according to the current configuration of the 428 * instance of the extension 429 * @return an array of <code>File</code> with eligible candidates for processing 430 */ 431 private File[] getCandidates() { 432 File[] files = sourceFilesDir.listFiles(fileFilter); 433 if (files != null) { 434 // Sort the files from oldest to newest 435 Arrays.sort(files, orderingComparator); 436 } 437 return files; 438 } 439 440 /** 441 * Performs the necessary processing to generate a URL for the instance of the <code>SyncSource</code> 442 * @param fileExtension the file extension configured 443 * @param filePrefix the file perfix configured 444 * @return a unique String with the URL for the instance of the extension 445 */ 446 private String computeUrl(final String fileExtension, final String filePrefix) { 447 StringBuilder sb = new StringBuilder(); 448 sb.append("file:///raw?initialized=true"); 449 addURLParam(sb, ARG_NAME_SRC_DIR, sourceFilesDir.getPath(), true); 450 addURLParam(sb, ARG_NAME_KEEP_NO_FILES, deleteProcessedFiles.toString(), true); 451 if (!deleteProcessedFiles) { 452 addURLParam(sb, ARG_NAME_DST_DIR, processedFilesDir.getPath(), true); 453 } 454 if (fileExtension != null) { 455 addURLParam(sb, ARG_NAME_FILE_EXTENSION, fileExtension, true); 456 } 457 if (filePrefix != null) { 458 addURLParam(sb, ARG_NAME_FILE_PREFIX, filePrefix, true); 459 } 460 return sb.toString(); 461 } 462}