Class AzureADSyncSource

  • All Implemented Interfaces:
    com.unboundid.directory.sdk.common.internal.Configurable, com.unboundid.directory.sdk.common.internal.ExampleUsageProvider, com.unboundid.directory.sdk.common.internal.UnboundIDExtension

    public final class AzureADSyncSource
    extends com.unboundid.directory.sdk.sync.api.SyncSource
    This class provides a simple example of a generic Sync Source which will detect CSV files in a given directory and synchronize them to an LDAP destination. More specifically, the extension will look for any files with the ".csv" extension, and interpret each row to be a single LDAP entry to be synchronized. After a file has been completely processed, it will be moved to a separate directory.

    The CSV header should contain valid LDAP attribute names. The 'uid' attribute is used as the RDN and is required to be present, and the source base DN is configurable using an extension argument. The DNs for the entries will be constructed as follows: uid={uid},{baseDN}. Each row in a CSV file is expected to have values for all of the LDAP attributes, not just those that have changed.

    This extension creates ChangeRecords using ChangeType.MODIFY, so when using this example, it's important to set modifies-as-creates=true on the Sync Class in the configuration. This will allow the Sync Pipe to create entries that don't already exist. For the sake of simplicity, this example does not handle DELETE operations.

    This extension uses the OpenCSV library, which is available at http://opencsv.sourceforge.net and provides simple CSV parsing.

    The following arguments are defined:

    • csv-files-dir -- The filesystem directory to monitor for CSV files
    • processed-files-dir -- The filesystem directory where finished CSV files should be moved
    • base-dn -- The base DN to use when creating LDAP entries from the CSV content
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void acknowledgeCompletedOps​(LinkedList<com.unboundid.directory.sdk.sync.types.SyncOperation> completedOps)
      Provides a way for the Data Sync Server to acknowledge back to the script which sync operations it has processed.
      void defineConfigArguments​(com.unboundid.util.args.ArgumentParser parser)
      Updates the provided argument parser to define any configuration arguments which may be used by this extension.
      com.unboundid.ldap.sdk.Entry fetchEntry​(com.unboundid.directory.sdk.sync.types.SyncOperation operation)
      Return a full source entry (in LDAP form) from the source, corresponding to the ChangeRecord that is passed in through the SyncOperation.
      void finalizeSyncSource()
      This hook is called when a Sync Pipe shuts down, when the resync process shuts down, or when the set-startpoint subcommand (from the realtime-sync command line tool) is finished.
      String getCurrentEndpointURL()
      Return the URL or path identifying the source endpoint from which this extension is transmitting data.
      String[] getExtensionDescription()
      Retrieves a human-readable description for this extension.
      String getExtensionName()
      Retrieves a human-readable name for this extension.
      List<com.unboundid.directory.sdk.sync.types.ChangeRecord> getNextBatchOfChanges​(int maxChanges, AtomicLong numStillPending)
      Return the next batch of change records from the source.
      Serializable getStartpoint()
      Gets the current value of the startpoint for change detection.
      void initializeSyncSource​(com.unboundid.directory.sdk.sync.types.SyncServerContext serverContext, com.unboundid.directory.sdk.sync.config.SyncSourceConfig config, com.unboundid.util.args.ArgumentParser parser)
      This hook is called when a Sync Pipe first starts up, when the resync process first starts up, or when the set-startpoint subcommand is called from the realtime-sync command line tool.
      void listAllEntries​(BlockingQueue<com.unboundid.directory.sdk.sync.types.ChangeRecord> outputQueue)
      Gets a list of all the entries in the source endpoint.
      void listAllEntries​(Iterator<String> inputLines, BlockingQueue<com.unboundid.directory.sdk.sync.types.ChangeRecord> outputQueue)
      Gets a list of all the entries in the source from a given file input.
      void setStartpoint​(com.unboundid.directory.sdk.sync.types.SetStartpointOptions options)
      This method should effectively set the starting point for synchronization to the place specified by the options parameter.
      • Methods inherited from class com.unboundid.directory.sdk.sync.api.SyncSource

        getExamplesArgumentSets
    • Method Detail

      • getExtensionName

        public String getExtensionName()
        Retrieves a human-readable name for this extension.
        Specified by:
        getExtensionName in interface com.unboundid.directory.sdk.common.internal.UnboundIDExtension
        Specified by:
        getExtensionName in class com.unboundid.directory.sdk.sync.api.SyncSource
        Returns:
        A human-readable name for this extension.
      • getExtensionDescription

        public String[] getExtensionDescription()
        Retrieves a human-readable description for this extension. Each element of the array that is returned will be considered a separate paragraph in generated documentation.
        Specified by:
        getExtensionDescription in interface com.unboundid.directory.sdk.common.internal.UnboundIDExtension
        Specified by:
        getExtensionDescription in class com.unboundid.directory.sdk.sync.api.SyncSource
        Returns:
        A human-readable description for this extension, or null or an empty array if no description should be available.
      • defineConfigArguments

        public void defineConfigArguments​(com.unboundid.util.args.ArgumentParser parser)
                                   throws com.unboundid.util.args.ArgumentException
        Updates the provided argument parser to define any configuration arguments which may be used by this extension. The argument parser may also be updated to define relationships between arguments (e.g. to specify required, exclusive, or dependent argument sets).
        Specified by:
        defineConfigArguments in interface com.unboundid.directory.sdk.common.internal.Configurable
        Overrides:
        defineConfigArguments in class com.unboundid.directory.sdk.sync.api.SyncSource
        Parameters:
        parser - The argument parser to be updated with the configuration arguments which may be used by this extension.
        Throws:
        com.unboundid.util.args.ArgumentException - If a problem is encountered while updating the provided argument parser.
      • initializeSyncSource

        public void initializeSyncSource​(com.unboundid.directory.sdk.sync.types.SyncServerContext serverContext,
                                         com.unboundid.directory.sdk.sync.config.SyncSourceConfig config,
                                         com.unboundid.util.args.ArgumentParser parser)
        This hook is called when a Sync Pipe first starts up, when the resync process first starts up, or when the set-startpoint subcommand is called from the realtime-sync command line tool. Any initialization of this sync source should be performed here. This method should generally store the SyncServerContext in a class member so that it can be used elsewhere in the implementation.
        Overrides:
        initializeSyncSource in class com.unboundid.directory.sdk.sync.api.SyncSource
        Parameters:
        serverContext - A handle to the server context for the server in which this extension is running.
        config - The general configuration for this sync source.
        parser - The argument parser which has been initialized from the configuration for this sync source.
      • finalizeSyncSource

        public void finalizeSyncSource()
        This hook is called when a Sync Pipe shuts down, when the resync process shuts down, or when the set-startpoint subcommand (from the realtime-sync command line tool) is finished. Any clean up of this sync source should be performed here.
        Overrides:
        finalizeSyncSource in class com.unboundid.directory.sdk.sync.api.SyncSource
      • setStartpoint

        public void setStartpoint​(com.unboundid.directory.sdk.sync.types.SetStartpointOptions options)
                           throws com.unboundid.directory.sdk.sync.types.EndpointException
        This method should effectively set the starting point for synchronization to the place specified by the options parameter. This should cause all changes previous to the specified start point to be disregarded and only changes after that point to be returned by getNextBatchOfChanges(int, AtomicLong).

        There are several different startpoint types (see SetStartpointOptions), and this implementation is not required to support them all. If the specified startpoint type is unsupported, this method should throw an UnsupportedOperationException.

        IMPORTANT: The RESUME_AT_SERIALIZABLE startpoint type must be supported by your implementation, because this is used when a Sync Pipe first starts up. The Serializable in this case is the same type that is returned by getStartpoint(); the Data Sync Server persists it and passes it back in on a restart.

        This method can be called from two different contexts:

        • When the 'set-startpoint' subcommand of the realtime-sync CLI is used (the Sync Pipe is required to be stopped in this context)
        • Immediately after a Sync Pipe starts up and a connection is first established to the source server (e.g. before the first call to getNextBatchOfChanges(int, AtomicLong))
        Specified by:
        setStartpoint in class com.unboundid.directory.sdk.sync.api.SyncSource
        Parameters:
        options - an object which indicates where exactly to start synchronizing (e.g. the end of the changelog, specific change number, a certain time ago, etc)
        Throws:
        com.unboundid.directory.sdk.sync.types.EndpointException - if there is any error while setting the start point
      • getStartpoint

        public Serializable getStartpoint()
        Gets the current value of the startpoint for change detection. This is the "bookmark" which indicates which changes have already been processed and which have not. In most cases, a change number is used to detect changes and is managed by the Data Sync Server, in which case this implementation needs only to return the latest acknowledged change number. In other cases, the return value may correspond to a different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. In any case, this method should return the value that is updated by acknowledgeCompletedOps(LinkedList).

        This method is called periodically and the return value is saved in the persistent state for the Sync Pipe that uses this extension as its Sync Source.

        IMPORTANT: The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this script (via acknowledgeCompletedOps(LinkedList)). Otherwise it will be possible for changes to be missed when the Data Sync Server is restarted or a connection error occurs.

        Specified by:
        getStartpoint in class com.unboundid.directory.sdk.sync.api.SyncSource
        Returns:
        a value to store in the persistent state for the Sync Pipe. This is usually a change number, but if a changelog table is not used to detect changes, this value should represent some other token to pass into setStartpoint(SetStartpointOptions) when the sync pipe starts up.
      • getCurrentEndpointURL

        public String getCurrentEndpointURL()
        Return the URL or path identifying the source endpoint from which this extension is transmitting data. This is used for logging purposes only, so it could just be a server name or hostname and port, etc.
        Specified by:
        getCurrentEndpointURL in class com.unboundid.directory.sdk.sync.api.SyncSource
        Returns:
        the path to the source endpoint
      • fetchEntry

        public com.unboundid.ldap.sdk.Entry fetchEntry​(com.unboundid.directory.sdk.sync.types.SyncOperation operation)
                                                throws com.unboundid.directory.sdk.sync.types.EndpointException
        Return a full source entry (in LDAP form) from the source, corresponding to the ChangeRecord that is passed in through the SyncOperation. This method should perform any queries necessary to gather the latest values for all the attributes to be synchronized.

        This method must be thread safe, as it will be called repeatedly and concurrently by each of the Sync Pipe worker threads as they process entries.

        If the original ChangeRecord has the full entry already set on it (which can be done using the ChangeRecord.Builder.fullEntry(Entry)), then this method will not get called, and the Data Sync Server will automatically use the full entry from the ChangeRecord. In this case, the implementation can always return null.

        Specified by:
        fetchEntry in class com.unboundid.directory.sdk.sync.api.SyncSource
        Parameters:
        operation - the SyncOperation which identifies the source "entry" to fetch. The ChangeRecord can be obtained by calling operation.getChangeRecord(). These ChangeRecords are generated by getNextBatchOfChanges(int, AtomicLong) or by listAllEntries(BlockingQueue).
        Returns:
        a full LDAP Entry, or null if no such entry exists.
        Throws:
        com.unboundid.directory.sdk.sync.types.EndpointException - if there is an error fetching the entry
      • acknowledgeCompletedOps

        public void acknowledgeCompletedOps​(LinkedList<com.unboundid.directory.sdk.sync.types.SyncOperation> completedOps)
                                     throws com.unboundid.directory.sdk.sync.types.EndpointException
        Provides a way for the Data Sync Server to acknowledge back to the script which sync operations it has processed. This method should update the official startpoint which was set by setStartpoint(SetStartpointOptions) and is returned by getStartpoint().

        IMPORTANT: The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this extension (via this method). Otherwise it will be possible for changes to be missed when the Data Sync Server is restarted or a connection error occurs.

        Specified by:
        acknowledgeCompletedOps in class com.unboundid.directory.sdk.sync.api.SyncSource
        Parameters:
        completedOps - a list of SyncOperations that have finished processing. The records are listed in the order they were first detected.
        Throws:
        com.unboundid.directory.sdk.sync.types.EndpointException - if there is an error acknowledging the changes back to the database
      • getNextBatchOfChanges

        public List<com.unboundid.directory.sdk.sync.types.ChangeRecord> getNextBatchOfChanges​(int maxChanges,
                                                                                               AtomicLong numStillPending)
                                                                                        throws com.unboundid.directory.sdk.sync.types.EndpointException
        Return the next batch of change records from the source. Change records are usually just hints that a change happened; they do not include the full contents of the target entry. In an effort to never synchronize stale data, the Data Sync Server will go back and fetch the full target entry for each change record.
        Specified by:
        getNextBatchOfChanges in class com.unboundid.directory.sdk.sync.api.SyncSource
        Parameters:
        maxChanges - the maximum number of changes to retrieve
        numStillPending - this should be set to the number of unretrieved changes that are still pending after this batch has been retrieved. This will be passed in as zero, and may be left that way if the actual value cannot be determined.
        Returns:
        a list of ChangeRecord instances, each corresponding to a single change at the source endpoint. If there are no new changes to return, this method should return an empty list.
        Throws:
        com.unboundid.directory.sdk.sync.types.EndpointException - if there is any error while retrieving the next batch of changes
      • listAllEntries

        public void listAllEntries​(BlockingQueue<com.unboundid.directory.sdk.sync.types.ChangeRecord> outputQueue)
                            throws com.unboundid.directory.sdk.sync.types.EndpointException
        Gets a list of all the entries in the source endpoint. This is used by the 'resync' command line tool.
        Overrides:
        listAllEntries in class com.unboundid.directory.sdk.sync.api.SyncSource
        Parameters:
        outputQueue - a queue of ChangeRecord objects which will be individually fetched via fetchEntry(SyncOperation)
        Throws:
        com.unboundid.directory.sdk.sync.types.EndpointException - if there is an error retrieving the list of entries to resync
      • listAllEntries

        public void listAllEntries​(Iterator<String> inputLines,
                                   BlockingQueue<com.unboundid.directory.sdk.sync.types.ChangeRecord> outputQueue)
                            throws com.unboundid.directory.sdk.sync.types.EndpointException
        Gets a list of all the entries in the source from a given file input. This is used by the 'resync' command line tool.
        Overrides:
        listAllEntries in class com.unboundid.directory.sdk.sync.api.SyncSource
        Parameters:
        inputLines - an Iterator containing the lines from the specified input file to resync (this is specified on the CLI for the resync command). These lines can be any format, for example a set of primary keys, a set of WHERE clauses, a set of full SQL queries, etc.
        outputQueue - a queue of ChangeRecord objects which will be individually fetched via fetchEntry(SyncOperation)
        Throws:
        com.unboundid.directory.sdk.sync.types.EndpointException - if there is an error retrieving the list of entries to resync