001/*
002 * CDDL HEADER START
003 *
004 * The contents of this file are subject to the terms of the
005 * Common Development and Distribution License, Version 1.0 only
006 * (the "License").  You may not use this file except in compliance
007 * with the License.
008 *
009 * You can obtain a copy of the license at
010 * docs/licenses/cddl.txt
011 * or http://www.opensource.org/licenses/cddl1.php.
012 * See the License for the specific language governing permissions
013 * and limitations under the License.
014 *
015 * When distributing Covered Code, include this CDDL HEADER in each
016 * file and include the License file at
017 * docs/licenses/cddl.txt.  If applicable,
018 * add the following below this CDDL HEADER, with the fields enclosed
019 * by brackets "[]" replaced with your own identifying information:
020 *      Portions Copyright [yyyy] [name of copyright owner]
021 *
022 * CDDL HEADER END
023 *
024 *
025 *      Copyright 2011-2018 Ping Identity Corporation
026 */
027package com.pingidentity.sync;
028
029import com.pingidentity.util.MSGraphAPI;
030import com.pingidentity.util.MSGraphResult;
031import com.unboundid.directory.sdk.sync.api.SyncSource;
032import com.unboundid.directory.sdk.sync.config.SyncSourceConfig;
033import com.unboundid.directory.sdk.sync.types.*;
034import com.unboundid.ldap.sdk.ChangeType;
035import com.unboundid.ldap.sdk.DN;
036import com.unboundid.ldap.sdk.Entry;
037import com.unboundid.ldap.sdk.LDAPException;
038import com.unboundid.util.args.ArgumentException;
039import com.unboundid.util.args.ArgumentParser;
040import com.unboundid.util.args.StringArgument;
041import com.unboundid.util.json.JSONObject;
042
043import java.io.Serializable;
044import java.io.UnsupportedEncodingException;
045import java.net.URLEncoder;
046import java.util.Iterator;
047import java.util.LinkedList;
048import java.util.List;
049import java.util.concurrent.BlockingQueue;
050import java.util.concurrent.atomic.AtomicLong;
051
052
053/**
054 * This class provides a simple example of a generic Sync Source which will
055 * detect CSV files in a given directory and synchronize them to an
056 * LDAP destination. More specifically, the extension will look for any files
057 * with the ".csv" extension, and interpret each row to be a single LDAP entry
058 * to be synchronized. After a file has been completely processed, it will be
059 * moved to a separate directory.
060 * <p>
061 * The CSV header should contain valid LDAP attribute names. The 'uid' attribute
062 * is used as the RDN and is required to be present, and the source base DN
063 * is configurable using an extension argument. The DNs for the entries will be
064 * constructed as follows: uid={uid},{baseDN}. Each row in a CSV file is
065 * expected to have values for all of the LDAP attributes, not just those that
066 * have changed.
067 * <p>
068 * This extension creates ChangeRecords using {@link ChangeType#MODIFY}, so when
069 * using this example, it's important to set <i>modifies-as-creates=true</i>
070 * on the Sync Class in the configuration. This will allow the Sync Pipe to
071 * create entries that don't already exist. For the sake of simplicity, this
072 * example does not handle DELETE operations.
073 * <p>
074 * This extension uses the OpenCSV library, which is available at
075 * <a href="http://opencsv.sourceforge.net">http://opencsv.sourceforge.net</a>
076 * and provides simple CSV parsing.
077 * <p>
078 * The following arguments are defined:
079 * <UL>
080 * <LI>csv-files-dir -- The filesystem directory to monitor for CSV files</LI>
081 * <LI>processed-files-dir -- The filesystem directory where finished CSV
082 * files should be moved</LI>
083 * <LI>base-dn -- The base DN to use when creating LDAP entries from the CSV
084 * content</LI>
085 * </UL>
086 */
087public final class AzureADSyncSource extends SyncSource {
088    //The server context which can be used for obtaining the server state,
089    //logging, etc.
090    private SyncServerContext serverContext;
091
092
093    private static final String ARG_NAME_TENANT_ID = "tenant-id";
094    private static final String ARG_NAME_CLIENT_ID = "client-id";
095    private static final String ARG_NAME_CLIENT_SECRET = "client-secret";
096    private static final String ARG_NAME_ATTRIBUTES = "attributes";
097    private static final String ARG_NAME_FILTER = "filter";
098
099    private MSGraphAPI msApi;
100
101    /**
102     * The set of attribute names to pull from the AzureAD users call
103     *
104     * Example: {"objectId", "userPrincipalName", "displayName", "givenName", "surname", "mail"}
105     */
106    private String attributeNames = null;
107
108    /**
109     * The filter used to select users from the AzureAD users call
110     */
111    private String filter = null;
112
113    /**
114     * Track a BaseDN that will be used to create the internal sync entry DN
115     * This can be changed with an argument to the plugin
116     */
117    private DN baseDN = null;
118
119
120    /**
121     * Retrieves a human-readable name for this extension.
122     *
123     * @return A human-readable name for this extension.
124     */
125    @Override
126    public String getExtensionName() {
127        return "AzureAD Windows Graph API Sync Source";
128    }
129
130
131    /**
132     * Retrieves a human-readable description for this extension.  Each element
133     * of the array that is returned will be considered a separate paragraph in
134     * generated documentation.
135     *
136     * @return A human-readable description for this extension, or {@code null}
137     * or an empty array if no description should be available.
138     */
139    @Override
140    public String[] getExtensionDescription() {
141        return new String[]{
142                "This extension implements a Sync Source which can be used " +
143                        "to resync users from an AzureAD service."
144        };
145    }
146
147
148    /**
149     * Updates the provided argument parser to define any configuration arguments
150     * which may be used by this extension.  The argument parser may also be
151     * updated to define relationships between arguments (e.g. to specify
152     * required, exclusive, or dependent argument sets).
153     *
154     * @param parser The argument parser to be updated with the configuration
155     *               arguments which may be used by this extension.
156     * @throws ArgumentException If a problem is encountered while updating the
157     *                           provided argument parser.
158     */
159    @Override
160    public void defineConfigArguments(final ArgumentParser parser)
161            throws ArgumentException {
162
163        StringArgument newStringArg;
164
165        newStringArg = new StringArgument(
166                null, ARG_NAME_TENANT_ID, true, 1, "{tenantId}",
167                "The tenantId used to call the AzureAD Graph API");
168        newStringArg.setSensitive(true);
169        parser.addArgument(newStringArg);
170
171        newStringArg = new StringArgument(
172                null, ARG_NAME_CLIENT_ID, true, 1, "{clientId}",
173                "The clientId used to call the AzureAD Graph API");
174        newStringArg.setSensitive(true);
175        parser.addArgument(newStringArg);
176
177        newStringArg = new StringArgument(
178                null, ARG_NAME_CLIENT_SECRET, true, 1, "{clientSecret}",
179                "The clientSecret used in conjunction with the clientId to call the AzureAD Graph API");
180        newStringArg.setSensitive(true);
181        parser.addArgument(newStringArg);
182
183        newStringArg = new StringArgument(
184                null, ARG_NAME_ATTRIBUTES, false, 1, "{attributes}",
185                "The attributes returned from the AzureAD Graph API." +
186                        "  Comma separated list of attribute names (i.e. first,last,address");
187        parser.addArgument(newStringArg);
188
189        newStringArg = new StringArgument(
190                null, ARG_NAME_FILTER, false, 1, "{filter}",
191                "The filter used with the AzureAD Graph API.");
192        parser.addArgument(newStringArg);
193    }
194
195
196    /**
197     * This hook is called when a Sync Pipe first starts up, when the
198     * <i>resync</i> process first starts up, or when the set-startpoint
199     * subcommand is called from the <i>realtime-sync</i> command line tool.
200     * Any initialization of this sync source should be performed here. This
201     * method should generally store the {@link SyncServerContext} in a class
202     * member so that it can be used elsewhere in the implementation.
203     *
204     * @param serverContext A handle to the server context for the server in
205     *                      which this extension is running.
206     * @param config        The general configuration for this sync source.
207     * @param parser        The argument parser which has been initialized from
208     *                      the configuration for this sync source.
209     */
210    @Override
211    public void initializeSyncSource(final SyncServerContext serverContext,
212                                     final SyncSourceConfig config,
213                                     final ArgumentParser parser) {
214        this.serverContext = serverContext;
215
216        StringArgument newStringArg;
217
218        newStringArg =
219                (StringArgument) parser.getNamedArgument(ARG_NAME_TENANT_ID);
220        String tenantId = newStringArg.getValue();
221
222        newStringArg =
223                (StringArgument) parser.getNamedArgument(ARG_NAME_CLIENT_ID);
224        String clientId = newStringArg.getValue();
225
226        newStringArg = (StringArgument) parser.getNamedArgument(ARG_NAME_CLIENT_SECRET);
227        String clientSecret = newStringArg.getValue();
228
229        newStringArg = (StringArgument) parser.getNamedArgument(ARG_NAME_ATTRIBUTES);
230        attributeNames = newStringArg.getValue();
231
232        newStringArg = (StringArgument) parser.getNamedArgument(ARG_NAME_FILTER);
233        filter = newStringArg.getValue();
234
235        msApi = new MSGraphAPI(tenantId, clientId, clientSecret);
236    }
237
238
239    /**
240     * This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
241     * process shuts down, or when the set-startpoint subcommand (from the
242     * <i>realtime-sync</i> command line tool) is finished. Any clean up of this
243     * sync source should be performed here.
244     */
245    @Override
246    public void finalizeSyncSource() {
247        //No implementation necessary.
248    }
249
250
251    /**
252     * This method should effectively set the starting point for synchronization
253     * to the place specified by the <code>options</code> parameter. This should
254     * cause all changes previous to the specified start point to be disregarded
255     * and only changes after that point to be returned by
256     * {@link #getNextBatchOfChanges(int, AtomicLong)}.
257     * <p>
258     * There are several different startpoint types (see
259     * {@link SetStartpointOptions}), and this implementation is not required to
260     * support them all. If the specified startpoint type is unsupported, this
261     * method should throw an {@link UnsupportedOperationException}.
262     * <p>
263     * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
264     * must be supported by your implementation, because this is used when a Sync
265     * Pipe first starts up. The {@link Serializable} in this case is the same
266     * type that is returned by {@link #getStartpoint()}; the
267     * Data Sync Server persists it and passes it back in on a restart.
268     * <p>
269     * This method can be called from two different contexts:
270     * <ul>
271     * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
272     * (the Sync Pipe is required to be stopped in this context)</li>
273     * <li>Immediately after a Sync Pipe starts up and a connection is first
274     * established to the source server (e.g. before the first call to
275     * {@link #getNextBatchOfChanges(int, AtomicLong)})</li>
276     * </ul>
277     *
278     * @param options an object which indicates where exactly to start synchronizing
279     *                (e.g. the end of the changelog, specific change number, a certain
280     *                time ago, etc)
281     * @throws EndpointException if there is any error while setting the start point
282     */
283    @Override
284    @SuppressWarnings("unchecked")
285    public void setStartpoint(final SetStartpointOptions options)
286            throws EndpointException {
287
288        throw new IllegalArgumentException("This source doesn't support realtime sync");
289    }
290
291
292    /**
293     * Gets the current value of the startpoint for change detection. This is the
294     * "bookmark" which indicates which changes have already been processed and
295     * which have not. In most cases, a change number is used to detect changes
296     * and is managed by the Data Sync Server, in which case this
297     * implementation needs only to return the latest acknowledged
298     * change number. In other cases, the return value may correspond to a
299     * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
300     * In any case, this method should return the value that is updated by
301     * {@link #acknowledgeCompletedOps(LinkedList)}.
302     * <p>
303     * This method is called periodically and the return value is saved in the
304     * persistent state for the Sync Pipe that uses this extension as its Sync
305     * Source.
306     * <p>
307     * <b>IMPORTANT</b>: The internal value for the startpoint should only be
308     * updated after a sync operation is acknowledged back to this script (via
309     * {@link #acknowledgeCompletedOps(LinkedList)}).
310     * Otherwise it will be possible for changes to be missed when the
311     * Data Sync Server is restarted or a connection error occurs.
312     *
313     * @return a value to store in the persistent state for the Sync Pipe. This is
314     * usually a change number, but if a changelog table is not used to
315     * detect changes, this value should represent some other token to
316     * pass into {@link #setStartpoint(SetStartpointOptions)}
317     * when the sync pipe starts up.
318     */
319    @Override
320    public Serializable getStartpoint() {
321        //We return null here since realtime sync isn't supported for this source
322        return null;
323    }
324
325
326    /**
327     * Return the URL or path identifying the source endpoint
328     * from which this extension is transmitting data. This is used for logging
329     * purposes only, so it could just be a server name or hostname and port, etc.
330     *
331     * @return the path to the source endpoint
332     */
333    @Override
334    public String getCurrentEndpointURL() {
335        return "Not Set - AzureAD URL";
336    }
337
338
339    /**
340     * Return a full source entry (in LDAP form) from the source, corresponding
341     * to the {@link ChangeRecord} that is passed in through the
342     * {@link SyncOperation}. This method should perform any queries necessary to
343     * gather the latest values for all the attributes to be synchronized.
344     * <p>
345     * This method <b>must be thread safe</b>, as it will be called repeatedly and
346     * concurrently by each of the Sync Pipe worker threads as they process
347     * entries.
348     * <p>
349     * If the original ChangeRecord has the full entry already set on it
350     * (which can be done using the
351     * {@link
352     * com.unboundid.directory.sdk.sync.types.ChangeRecord.Builder#fullEntry(Entry)}),
353     * then this method will not get called, and the Data Sync Server
354     * will automatically use the full entry from the ChangeRecord. In this case,
355     * the implementation can always return {@code null}.
356     *
357     * @param operation the SyncOperation which identifies the source "entry" to
358     *                  fetch. The ChangeRecord can be obtained by calling
359     *                  <code>operation.getChangeRecord()</code>.
360     *                  These ChangeRecords are generated by
361     *                  {@link #getNextBatchOfChanges(int, AtomicLong)}
362     *                  or by
363     *                  {@link #listAllEntries(BlockingQueue)}.
364     * @return a full LDAP Entry, or null if no such entry exists.
365     * @throws EndpointException if there is an error fetching the entry
366     */
367    @Override
368    public Entry fetchEntry(final SyncOperation operation)
369            throws EndpointException {
370        ChangeRecord record = operation.getChangeRecord();
371        throw new IllegalStateException(
372                "fetchEntry() should not be called because the full entry is set " +
373                        "on the ChangeRecord: " + record.toString());
374    }
375
376
377    /**
378     * Provides a way for the Data Sync Server to acknowledge back to the
379     * script which sync operations it has processed. This method should update
380     * the official startpoint which was set by
381     * {@link #setStartpoint(SetStartpointOptions)} and is
382     * returned by {@link #getStartpoint()}.
383     * <p>
384     * <b>IMPORTANT</b>: The internal value for the startpoint should only be
385     * updated after a sync operation is acknowledged back to this extension (via
386     * this method). Otherwise it will be possible for changes to be missed when
387     * the Data Sync Server is restarted or a connection error occurs.
388     *
389     * @param completedOps a list of {@link SyncOperation}s that have finished processing.
390     *                     The records are listed in the order they were first detected.
391     * @throws EndpointException if there is an error acknowledging the changes back to the
392     *                           database
393     */
394    @Override
395    public void acknowledgeCompletedOps(
396            final LinkedList<SyncOperation> completedOps)
397            throws EndpointException {
398        throw new IllegalArgumentException("This source doesn't support realtime sync");
399    }
400
401
402    /**
403     * Return the next batch of change records from the source. Change records
404     * are usually just hints that a change happened; they do not include
405     * the full contents of the target entry. In an effort to never synchronize
406     * stale data, the Data Sync Server will go back and fetch the full
407     * target entry for each change record.
408     *
409     * @param maxChanges      the maximum number of changes to retrieve
410     * @param numStillPending this should be set to the number of unretrieved changes that
411     *                        are still pending after this batch has been retrieved. This will
412     *                        be passed in
413     *                        as zero, and may be left that way if the actual value cannot be
414     *                        determined.
415     * @return a list of {@link ChangeRecord} instances, each
416     * corresponding to a single change at the source endpoint.
417     * If there are no new changes to return, this method should return
418     * an empty list.
419     * @throws EndpointException if there is any error while retrieving the next batch of changes
420     */
421    @Override
422    public List<ChangeRecord> getNextBatchOfChanges(
423            final int maxChanges,
424            final AtomicLong numStillPending)
425            throws EndpointException {
426        throw new IllegalArgumentException("This source doesn't support realtime sync");
427    }
428
429
430    /**
431     * Gets a list of all the entries in the source endpoint. This is used by the
432     * 'resync' command line tool.
433     *
434     * @param outputQueue a queue of ChangeRecord objects which will be individually
435     *                    fetched via {@link #fetchEntry(SyncOperation)}
436     * @throws EndpointException if there is an error retrieving the list of entries to resync
437     */
438    @Override
439    public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue)
440            throws EndpointException {
441        serverContext.debugInfo("Beginning to dump all entries...");
442
443        MSGraphResult graphResult = msApi.getEntries(MSGraphAPI.OBJECT_TYPE.USERS,
444                filter,
445                attributeNames);
446
447        while (graphResult.size() > 0) {
448            List<Entry> entries = graphResult.entries;
449
450            if (entries == null || entries.size() == 0) {
451                break;
452            }
453
454            for (Entry e : entries) {
455                /**
456                 * Now that we have an entry, create a builder and place on the builder queue
457                 */
458                try {
459                    /**
460                     * Let's create a ChangeRecord
461                     */
462                    ChangeRecord.Builder bldr = new ChangeRecord.Builder(
463                            ChangeType.MODIFY, e.getParsedDN());
464
465                    bldr.changeTime(System.currentTimeMillis());
466
467                    /**
468                     * Add the full entry to the ChangeRecord. This will cause the
469                     * Sync engine to skip the call to fetchEntry() and instead use this
470                     * entry.
471                     */
472                    bldr.fullEntry(e);
473
474                    /**
475                     * Put the ChangeRecord onto the outputQueue, to be processed by the sync
476                     * server core
477                     */
478                    outputQueue.put(bldr.build());
479                } catch (LDAPException ex) {
480                    serverContext.debugError("Problem creating the BuildRecord: " + ex.getMessage());
481                } catch (InterruptedException ix) {
482                    serverContext.debugError( "Blocking queue iterrupted: " + ix.getMessage());
483                }
484            }
485
486            /**
487             * Let's get the nextLink data field.  This will provide the URL to use to
488             * retrieve the next page of data.
489             */
490            graphResult = msApi.nextPage(graphResult);
491        }
492
493
494    }
495
496    /**
497     * Gets a list of all the entries in the source from a given file input.
498     * This is used by the 'resync' command line tool.
499     *
500     * @param inputLines  an Iterator containing the lines from the specified input file to
501     *                    resync (this is specified on the CLI for the resync command).
502     *                    These lines can be any format, for example a set of primary keys,
503     *                    a set of WHERE clauses, a set of full SQL queries, etc.
504     * @param outputQueue a queue of ChangeRecord objects which will be individually
505     *                    fetched via {@link #fetchEntry(SyncOperation)}
506     * @throws EndpointException if there is an error retrieving the list of entries to resync
507     */
508    @Override
509    public void listAllEntries(final Iterator<String> inputLines,
510                               final BlockingQueue<ChangeRecord> outputQueue)
511            throws EndpointException {
512        serverContext.debugInfo("Beginning to dump entries from file...");
513
514    }
515
516    /**
517     * Method to encode a string value using the 'UTF-8 encoding scheme.
518     * if the value is null, then we will turn that into an empty string ""
519     *
520     * @param value
521     * @return
522     */
523    // Method to encode a string value using `UTF-8` encoding scheme
524    private static String encodeValue(String value) {
525        if (value == null) {
526            return "";
527        }
528
529        try {
530            return URLEncoder.encode(value, "UTF-8");
531        } catch (UnsupportedEncodingException ex) {
532            throw new RuntimeException(ex.getCause());
533        }
534    }
535
536    /**
537     * Helpef function to add an attribute from a JSONObject into an LDAP Entry.
538     *
539     * @param entry
540     * @param jsonObj
541     * @param attrName
542     */
543    private void addAttrToEntry (final Entry entry, final JSONObject jsonObj, final String attrName) {
544
545        if (jsonObj == null || attrName == null) return;
546
547        String attrVal = jsonObj.getFieldAsString(attrName);
548
549        if (attrVal != null && ! "null".equals(attrVal)) {
550            entry.addAttribute(attrName, attrVal);
551        }
552    }
553}