Computer science thoughts

Using multi-threading on Talend


Threads are a way of distributing the processing load over several calculation units. Talend offers the possibility of launching multi-threaded sub-jobs, in order to parallelize the executions. Here is a proposal for setting up such a treatment.

Creation of the multi-threaded job

You must first isolate the processing to be launched in thread, so that it can be executed independently. Create a job and take particular care to properly manage competition between future threads, by not writing to shared resources (files, same rows of a database table, etc.).

If connecting to a database, the connection component must not use a shared connection (uncheck the box "Use or save a shared connection to a database).

It can be interesting that the launched thread knows which thread it represents. Also, you can add two parameters in the context indicating the thread number and the number of threads:

Thus, the logs will be able to display the information of the current thread like this:

log.info("Starting thread {}/{}", context.num_thread, context.nb_threads);

To keep in mind that the processing will be launched in multi-thread, name the job with an explicit name ("threadImport" for example).

Call of the job in multi-threads

Then create a second job. Place a tRunJob calling the first created job on the project and repeat this as many times as you want to create threads. In the example below, 3 threads will be spawned at runtime:

On the tRunJob parameters, you can assign a value to the two variables created previously (num_thread and nb_threads):

In the parent Job settings, go to Extra and check Multi-threaded execution. This check box specifies that all the sub-jobs contained in the process will be launched in multi-threaded mode, and not consecutively.

Save your job and name it with an explicit name, indicating that it performs multi-threading (like "multithreadImport").

Then test the processing, you should have a display close to this, in a variable execution order:

-- Starting thread 1/3 --
-- Starting thread 2/3 --
-- Starting thread 3/3 --

Process a treatment on a list with multi-threading

It is possible to share the processing of a list between the different threads created:

  • Create a "StaticVariable.java" routine including the following Java code:
    public static List<String> liste = new ArrayList<>();
    The list thus created will be visible and shared between all the jobs.
  • Create a new job and fill the list from a file or a query (by creating a tDBInput1 linked to a tJavaRow for example)
  • The tJavaRow will contain the code (if it is a list of customers):
    StaticVariable.liste.add (input_row.client);
  • Add a tRunJob to call the job that performs multi-threading.
  • On the job at the end of the chain (the one launched in multi-thread), filter the list to process only part of it. You will create this:

    • Add tJavaFlex which will browse the list of data. Put the "for" in the "Initial code" part and the ending brace in the "End code" part.
      for (String client : StaticVariable.liste) {
    • Place a tMap and add a "line_number" field to count line numbers:

      Assign the following code to line_number, using the thread number so as not to have any competition:
    • Place a tFilterRow to filter the rows concerned by the job and specify the following condition, allowing the rows to be separated according to the number and the maximum number of threads:
      input_row.line_number % context.nb_threads == (context.num_thread-1)
    • Store everything in a tHashOutput which will contain the filtered data.
  • Launch the main job to verify that everything is going well.
Dernière modification le 01/01/2021 - Quillevere.net

Search in this website

fr en rss RSS info Informations