001package com.pingidentity.sync.source;
002
003import com.pingidentity.util.*;
004import com.unboundid.directory.sdk.common.internal.Reconfigurable;
005import com.unboundid.directory.sdk.sync.api.SyncSource;
006import com.unboundid.directory.sdk.sync.config.SyncSourceConfig;
007import com.unboundid.directory.sdk.sync.types.*;
008import com.unboundid.ldap.sdk.ChangeType;
009import com.unboundid.ldap.sdk.DN;
010import com.unboundid.ldap.sdk.Entry;
011import com.unboundid.ldap.sdk.ResultCode;
012import com.unboundid.util.args.*;
013
014import java.io.File;
015import java.io.FileFilter;
016import java.io.IOException;
017import java.io.Serializable;
018import java.nio.file.Files;
019import java.nio.file.LinkOption;
020import java.nio.file.attribute.UserPrincipal;
021import java.util.*;
022import java.util.concurrent.atomic.AtomicLong;
023
024import static com.pingidentity.util.ARG.makePlaceholder;
025import static java.lang.Math.max;
026import static java.lang.Math.min;
027
028/**
029 * This class provides a simple mechanism to inject raw file contents and metadata in Sync pipes
030 */
031public class RawFiles extends SyncSource implements Reconfigurable<SyncSourceConfig> {
032    private static final String SRC_DIR = "sources/raw";
033    private static final String ARG_DEFAULT_SRC_DIR = SRC_DIR + "/pending";
034    private static final String ARG_NAME_SRC_DIR = "src-dir";
035    private static final String ARG_DEFAULT_DST_DIR = SRC_DIR + "/processed";
036    private static final String ARG_NAME_DST_DIR = "processed-dir";
037    private static final String ARG_NAME_KEEP_NO_FILES = "delete-processed-files";
038    private static final String ARG_NAME_FILE_EXTENSION = "file-extension";
039    private static final String ARG_NAME_FILE_PREFIX = "file-prefix";
040    private static final String FILE_ATTACHMENT_KEY = "File.Attachment";
041    private static final String ARG_NAME_RDN_ATTR = "rdn-attribute";
042    private static final String RDN_ATTR_DEFAULT = "id";
043    private static final String ARG_NAME_SORTING_CRITERIA = "sort-by";
044    private static final String ARG_NAME_REVERSE_SORTING_ORDER = "reverse-sort-order";
045    public static final String SORT_CRITERIA_MODIFIED = "last-modified";
046    public static final String SORT_CRITERIA_CREATED = "created";
047    public static final String SORT_CRITERIA_NAME = "file-name";
048    public static final String SORT_CRITERIA_RAW = "raw-file";
049    private FileFilter fileFilter;
050    private Comparator<File> orderingComparator;
051    private File sourceFilesDir;
052    private File processedFilesDir;
053    private Boolean deleteProcessedFiles = true;
054    private SyncServerContext serverContext;
055    private String rdnAttr;
056    private String startpoint = "";
057    String url = "ldap://raw?initialized=false";
058
059    /**
060     * @return the extension name
061     */
062    @Override
063    public String getExtensionName() {
064        return "Raw Files Sync Source";
065    }
066
067    /**
068     * @return a array of String providing description of the extension
069     */
070    @Override
071    public String[] getExtensionDescription() {
072        return new String[]{"A sync source to synchronize raw files contents to a destination",
073                "Files are monitored from a local directory and processed as they are detected. " +
074                        "It is possible to choose ordering in which files will be added to the sync pipe queue " +
075                        "and add a filter to select files with a specific prefix or extension."};
076    }
077
078    /**
079     * Performs the necessary processing to define the configuration arguments for the extension
080     * @param parser the argument parser
081     * @throws ArgumentException if any argument declaration is incorrect
082     */
083    @Override
084    public void defineConfigArguments(ArgumentParser parser) throws ArgumentException {
085        List<File> defaultSourceFile = new ArrayList<>();
086        defaultSourceFile.add(new File(ARG_DEFAULT_SRC_DIR));
087        parser.addArgument(new FileArgument(ARG.NO_SHORTCUT, ARG_NAME_SRC_DIR, ARG.OPTIONAL, ARG.UNIQUE, "{path}",
088                "The file system directory to monitor for raw files", ARG.FILE_MUST_EXIST, ARG.PARENT_MUST_EXIST, ARG
089                .MAY_NOT_BE_FILE, ARG.MUST_BE_DIRECTORY, defaultSourceFile));
090
091        List<File> defaultDestinationFile = new ArrayList<>();
092        defaultDestinationFile.add(new File(ARG_DEFAULT_DST_DIR));
093        parser.addArgument(new FileArgument(ARG.NO_SHORTCUT, ARG_NAME_DST_DIR, ARG.OPTIONAL, ARG.UNIQUE, "{path}",
094                "The file system directory where the extension should "
095                        + "move CSV files after it finishes processing them", ARG.FILE_MUST_EXIST, ARG
096                .PARENT_MUST_EXIST, ARG.MAY_NOT_BE_FILE, ARG.MUST_BE_DIRECTORY, defaultDestinationFile));
097
098        parser.addArgument(new BooleanArgument(ARG.NO_SHORTCUT, ARG_NAME_KEEP_NO_FILES,
099                "Whether to delete processed files instead of moving them to the processed directory"));
100
101        parser.addArgument(new StringArgument(ARG.NO_SHORTCUT, ARG_NAME_FILE_EXTENSION, ARG.OPTIONAL, ARG.UNIQUE, "{" + ARG_NAME_FILE_EXTENSION + "}", "File extension to include for synchronization"));
102        parser.addArgument(new StringArgument(ARG.NO_SHORTCUT, ARG_NAME_FILE_PREFIX, ARG.OPTIONAL, ARG.UNIQUE, "{" + ARG_NAME_FILE_PREFIX + "}", "File prefix to include for synchronization"));
103
104        StringArgument rdnAttrArg = new StringArgument(ARG.NO_SHORTCUT, ARG_NAME_RDN_ATTR, ARG.OPTIONAL, ARG.UNIQUE, makePlaceholder(ARG_NAME_RDN_ATTR), "Attribute name to use for RDN (Default: " + RDN_ATTR_DEFAULT + ")", RDN_ATTR_DEFAULT);
105        parser.addArgument(rdnAttrArg);
106
107        Set<String> allowedValues = new HashSet<>();
108        allowedValues.add(SORT_CRITERIA_MODIFIED);
109        allowedValues.add(SORT_CRITERIA_CREATED);
110        allowedValues.add(SORT_CRITERIA_NAME);
111        allowedValues.add(SORT_CRITERIA_RAW);
112        List<String> defaultValues = new ArrayList<>(1);
113        defaultValues.add(SORT_CRITERIA_MODIFIED);
114        parser.addArgument(new StringArgument(ARG.NO_SHORTCUT, ARG_NAME_SORTING_CRITERIA, ARG.OPTIONAL, ARG.UNIQUE, makePlaceholder(ARG_NAME_SORTING_CRITERIA), "The order in which files in the source directory must be ordered for processing", allowedValues, defaultValues));
115
116        parser.addArgument(new BooleanArgument(ARG.NO_SHORTCUT, ARG_NAME_REVERSE_SORTING_ORDER, "Reverse the sorting order"));
117    }
118
119    /**
120     * Performs the necessary processing to validate the validity of the proposed configuration
121     * @param syncSourceConfig the configuration object to use to verify validity
122     * @param argumentParser the argument parser
123     * @param unacceptableReasons a list of String describing the reasons for which the proposed configuration is not acceptable
124     * @return <code>true</code> is the configuration is found to be acceptable, <code>false</code> otherwise
125     */
126    @Override
127    public boolean isConfigurationAcceptable(SyncSourceConfig syncSourceConfig, ArgumentParser argumentParser, List<String> unacceptableReasons) {
128        return true;
129    }
130
131    /**
132     * Performs the necessary processing to apply the proposed configuration, either at run time or with a restart of the
133     *  SyncSource or the server instance.
134     * @param syncSourceConfig the configuration object to use
135     * @param parser the argument parser from whence to pull the configuration argument values
136     * @param adminActionsRequired a list of String with administrative operations required to complete applying the configuration
137     * @param messages a list of String with message about the processing of applying the configuration
138     * @return <code>ResultCode.SUCCESS</code>
139     */
140    @Override
141    public ResultCode applyConfiguration(SyncSourceConfig syncSourceConfig, ArgumentParser parser,
142                                         List<String> adminActionsRequired, List<String> messages) {
143        rdnAttr = parser.getStringArgument(ARG_NAME_RDN_ATTR).getValue();
144        String fileExtension = parser.getStringArgument(ARG_NAME_FILE_EXTENSION).getValue();
145        String filePrefix = parser.getStringArgument(ARG_NAME_FILE_PREFIX).getValue();
146
147        fileFilter = new SourceFileFilter(fileExtension, filePrefix);
148
149        boolean isReversed = parser.getBooleanArgument(ARG_NAME_REVERSE_SORTING_ORDER).isPresent();
150        String sortingCriteria = parser.getStringArgument(ARG_NAME_SORTING_CRITERIA).getValue();
151        switch(sortingCriteria){
152            case SORT_CRITERIA_RAW:
153                orderingComparator = new FileComparator(isReversed);
154            case SORT_CRITERIA_NAME:
155                orderingComparator = new NameComparator(isReversed);
156            case SORT_CRITERIA_CREATED:
157                orderingComparator = new CreateComparator(isReversed);
158                break;
159            default:
160                orderingComparator = new LastModComparator(isReversed);
161        }
162
163        sourceFilesDir = parser.getFileArgument(ARG_NAME_SRC_DIR).getValue();
164        if (!sourceFilesDir.isAbsolute()) {
165            sourceFilesDir = new File(serverContext.getServerRoot().getPath() + "/" + sourceFilesDir.getPath());
166        }
167        processedFilesDir = parser.getFileArgument(ARG_NAME_DST_DIR).getValue();
168        if (!processedFilesDir.isAbsolute()) {
169            processedFilesDir = new File(serverContext.getServerRoot().getPath() + "/" + processedFilesDir.getPath());
170        }
171        deleteProcessedFiles = parser.getBooleanArgument(ARG_NAME_KEEP_NO_FILES).isPresent();
172        url = computeUrl(fileExtension, filePrefix);
173        return ResultCode.SUCCESS;
174    }
175
176    /**
177     * Performs the necessary processing to initialize the <code>SyncSource</code>
178     * @param serverContext the server context of the instance of the SyncSource
179     * @param config the configuration object
180     * @param parser the argument parser
181     */
182    @Override
183    public void initializeSyncSource(SyncServerContext serverContext, SyncSourceConfig config, ArgumentParser parser) {
184        this.serverContext = serverContext;
185
186        this.orderingComparator = new LastModComparator(parser.getBooleanArgument(ARG_NAME_REVERSE_SORTING_ORDER).isPresent());
187
188        // Use the applyConfiguration method to set up the extension
189        final ArrayList<String> adminActionsRequired = new ArrayList<>(5);
190        final ArrayList<String> messages = new ArrayList<>(5);
191        final ResultCode resultCode = applyConfiguration(config, parser, adminActionsRequired, messages);
192        if (resultCode != ResultCode.SUCCESS) {
193            LOG.out("Error initializing " + config.getConfigObjectName(), this, config, serverContext);
194            for (String m : messages) {
195                LOG.out(m, this, config, serverContext);
196            }
197        }
198    }
199
200    private void addURLParam(StringBuilder sb, String name, String value, boolean append) {
201        if (append) sb.append("&");
202        sb.append(name);
203        sb.append("=");
204        sb.append(value);
205    }
206
207    /**
208     * Performs the necessary processing to compute and return the URL for the instance of the <code>SyncSource</code>
209     * @return a String with a unique URL for the instance of the extension
210     */
211    @Override
212    public String getCurrentEndpointURL() {
213        return url;
214    }
215
216    /**
217     * Performs the necessary processing to correctly set the start point of the Sync pipe the instance of the extension
218     * is associated with.
219     * @param setStartpointOptions the <code>SetStartpointOptions</code> to use, such as SetStartpointOptions.BEGINNING_OF_CHANGELOG
220     * @throws EndpointException if an error was encountered in the process of setting the startpoint
221     */
222    @Override
223    public void setStartpoint(SetStartpointOptions setStartpointOptions) throws EndpointException {
224        // currently only support BEGINNING_OF_CHANGELOG (i.e. process all the files in the source directory)
225        switch (setStartpointOptions.getStartpointType()) {
226            case BEGINNING_OF_CHANGELOG:
227                startpoint = "";
228                break;
229            case RESUME_AT_CSN:
230                startpoint = setStartpointOptions.getCSN();
231                break;
232            case RESUME_AT_SERIALIZABLE:
233                Serializable serializableValue = setStartpointOptions.getSerializableValue();
234                if (serializableValue == null) {
235                    throw new IllegalArgumentException("Serializable value must be provided");
236                }
237                if (!(serializableValue instanceof String)) {
238                    throw new IllegalArgumentException("Serializable value must be a String");
239                }
240                startpoint = (String) serializableValue;
241                break;
242            case END_OF_CHANGELOG:
243                File[] files = getCandidates();
244                startpoint = files[files.length].getName();
245                break;
246            case RESUME_AT_CHANGE_TIME:
247                throw new IllegalArgumentException(
248                        "This start point type is not supported yet: "
249                                + setStartpointOptions.getStartpointType().toString());
250            default:
251                throw new IllegalArgumentException(
252                        "This start point type is not supported: "
253                                + setStartpointOptions.getStartpointType().toString());
254        }
255    }
256
257    /**
258     * @return the startpoint serializanle information for the sync pipe the instance of the extension is associated with
259     */
260    @Override
261    public Serializable getStartpoint() {
262        return startpoint;
263    }
264
265
266    /**
267     * Performs the necessary processing to retrieve the next batch of files to enqueue in the Sync pipe
268     * @param maxChanges the maximum number of files to enqueue in one call
269     * @param stillPending the remaining number of files still pending after one call has completed
270     * @return
271     * @throws EndpointException if an error was encountered while retrieving the next batch
272     */
273    @Override
274    public List<ChangeRecord> getNextBatchOfChanges(int maxChanges, AtomicLong stillPending) throws EndpointException {
275        List<ChangeRecord> results = new ArrayList<>();
276        File[] files = getCandidates();
277        if (files != null) {
278            int i=0;
279            for (File f: files){
280                if (i++>maxChanges)
281                    break;
282                // bail fast
283                if (f == null)
284                    continue;
285                String id = rdnAttr + "=" + f.getName();
286                ChangeRecord.Builder changeRecordBuilder = new ChangeRecord.Builder(ChangeType.MODIFY, id);
287                changeRecordBuilder.addProperty(FILE_ATTACHMENT_KEY, f);
288                changeRecordBuilder.changeTime(f.lastModified());
289                try {
290                    UserPrincipal owner = Files.getOwner(f.toPath());
291                    changeRecordBuilder.modifier(owner.getName());
292                } catch (IOException e) {
293                    changeRecordBuilder.modifier("unavailable");
294                }
295                results.add(changeRecordBuilder.build());
296            }
297            // Set the number of files pending for processing
298            stillPending.set(max(results.size()-maxChanges,0));
299        }
300
301        return results;
302    }
303
304    /**
305     * Performs the necessary processing to retrieve the file contents and create an entry
306     * @param syncOperation the sync operation to process
307     * @return the <code>Entry</code> with the file contents and metadata
308     * @throws EndpointException if an issue was encountered while processing the <code>SyncOperation</code>
309     */
310    @Override
311    public Entry fetchEntry(SyncOperation syncOperation) throws EndpointException {
312        Entry entry = null;
313        if (syncOperation == null)
314            return entry;
315
316        ChangeRecord changeRecord = syncOperation.getChangeRecord();
317        if (changeRecord == null)
318            return entry;
319
320        File f = (File) changeRecord.getProperty(FILE_ATTACHMENT_KEY);
321        if (f == null)
322            return entry;
323
324        DN identifiableInfo = changeRecord.getIdentifiableInfo();
325        if (identifiableInfo == null)
326            return entry;
327
328        entry = new Entry(identifiableInfo);
329
330        try {
331            byte[] bytes = Files.readAllBytes(f.toPath());
332            entry.addAttribute("contents", bytes);
333        } catch (IOException e) {
334            entry.addAttribute("contents-error", e.getMessage());
335        }
336
337        try {
338            Object creationTime = Files.getAttribute(f.toPath(), "creationTime");
339            if (creationTime != null) {
340                entry.addAttribute("creation-time", Files.getAttribute(f.toPath(), "creationTime").toString());
341            }
342        } catch (IOException e) {
343            entry.addAttribute("creation-time-error",e.getMessage());
344        }
345
346        try {
347            Object lastModifiedTime = Files.getAttribute(f.toPath(), "lastModifiedTime");
348            if (lastModifiedTime != null) {
349                entry.addAttribute("last-modified-time", lastModifiedTime.toString());
350            }
351        } catch (IOException e) {
352            entry.addAttribute("last-modified-time-error", e.getMessage());
353        }
354
355        UserPrincipal userPrincipal = null;
356        try {
357            userPrincipal = Files.getOwner(f.toPath(), LinkOption.NOFOLLOW_LINKS);
358            if (userPrincipal != null) {
359                entry.addAttribute("owner", userPrincipal.getName());
360            }
361        } catch (IOException e) {
362            entry.addAttribute("owner-error", e.getMessage());
363        }
364
365        try {
366            entry.addAttribute("size", Long.toString(Files.size(f.toPath())));
367        } catch (IOException e) {
368            entry.addAttribute("size-error", e.getMessage());
369        }
370        entry.addAttribute("file-name", f.getName());
371        entry.addAttribute("file-path", f.getPath());
372
373        return entry;
374    }
375
376    /**
377     * Performs the necessary processing to deal with each completed <code>SyncOperation</code> after it has been
378     * successfully processed
379     * @param completedOperations a List of completed <code>SyncOperation</code>
380     * @throws EndpointException if an error is encountered in the process of acknowledging the completed operations
381     */
382    @Override
383    public void acknowledgeCompletedOps(LinkedList<SyncOperation> completedOperations) throws EndpointException {
384        for (SyncOperation completedOperation : completedOperations) {
385            File f = (File) completedOperation.getChangeRecord().getProperty(FILE_ATTACHMENT_KEY);
386            startpoint = f.getName();
387            try {
388                if (deleteProcessedFiles) {
389                    deleteCompletedFile(f);
390                } else {
391                    moveCompletedFile(f);
392                }
393            } catch (Exception e) {
394                throw new EndpointException(PostStepResult.ABORT_OPERATION, e);
395            }
396        }
397    }
398
399    /**
400     * Peforms the necessary processing to delete a file once an operation has completed
401     * @param srcFile the file to delete
402     * @throws Exception if an error is encountered while deleting the file
403     */
404    private void deleteCompletedFile(final File srcFile) throws Exception {
405        if (srcFile != null) {
406            if (!srcFile.delete()) {
407                throw new Exception("Could not delete " + srcFile.getName());
408            }
409        }
410    }
411
412    /**
413     * Performs the necessary processing to move a file once an operation has completed
414     * @param srcFile the file to move
415     * @throws Exception if an error is encountered while moving the file
416     */
417    private void moveCompletedFile(final File srcFile) throws Exception {
418        final File destFile = new File(processedFilesDir, srcFile.getName());
419        if (srcFile != null) {
420            if (!srcFile.renameTo(destFile)) {
421                throw new Exception("Could not rename " + srcFile.getName() + " to " + destFile.getName());
422            }
423        }
424    }
425
426    /**
427     * Performs the necessary processing to retrieve candidate files according to the current configuration of the
428     * instance of the extension
429     * @return an array of <code>File</code> with eligible candidates for processing
430     */
431    private File[] getCandidates() {
432        File[] files = sourceFilesDir.listFiles(fileFilter);
433        if (files != null) {
434            // Sort the files from oldest to newest
435            Arrays.sort(files, orderingComparator);
436        }
437        return files;
438    }
439
440    /**
441     * Performs the necessary processing to generate a URL for the instance of the <code>SyncSource</code>
442     * @param fileExtension the file extension configured
443     * @param filePrefix the file perfix configured
444     * @return a unique String with the URL for the instance of the extension
445     */
446    private String computeUrl(final String fileExtension, final String filePrefix) {
447        StringBuilder sb = new StringBuilder();
448        sb.append("file:///raw?initialized=true");
449        addURLParam(sb, ARG_NAME_SRC_DIR, sourceFilesDir.getPath(), true);
450        addURLParam(sb, ARG_NAME_KEEP_NO_FILES, deleteProcessedFiles.toString(), true);
451        if (!deleteProcessedFiles) {
452            addURLParam(sb, ARG_NAME_DST_DIR, processedFilesDir.getPath(), true);
453        }
454        if (fileExtension != null) {
455            addURLParam(sb, ARG_NAME_FILE_EXTENSION, fileExtension, true);
456        }
457        if (filePrefix != null) {
458            addURLParam(sb, ARG_NAME_FILE_PREFIX, filePrefix, true);
459        }
460        return sb.toString();
461    }
462}