Microservices in action: Building reliable Adapter Framework (Part 2)

Microservices in action: Building reliable Adapter Framework (Part 2)

This is the second post on the subject of building Adapter Framework using Microservice Architecture (MCSA AF). AF (ABCS in Oracle terms) as a technical layer for MCSA implementation has been chosen because various data sources is a crucial element of any modern infrastructure [11], especially in Analytics solutions.

Read Part 1 here: Microservices in action: Building reliable Adapter Framework (Part 1)

Microservices are in some sense like Leibniz’ monads, which are imperatively responsive and message driven [12]. These two aforementioned characteristics, including two others MCS characteristics, that we are trying to achieve in this Java FA implementation. The implementation of the file adapter’s core functionality has been explicated in the previous publication, and other essential parts will be discussed here, including, but not limited to, patterns and methods for improving reliability, resiliency and responsiveness.

FSO Operations

File System Objects operations are the secondary priority after data parsing in the implementation queue. Although requirements could be different, they are based on the same basic functional modules, like

1. get the file list from directory

public List<String> getFileList(String dirstr){
      List<String> filelist = new ArrayList<String>();
      File[] files = new File(dirstr).listFiles();
      try{
            for (File file : files) {
                  if (file.isFile()) {
                        filelist.add(file.getName());
                  }
            }
      } catch (Exception e) {
            e.printStackTrace();
      }
      return filelist;
}

2. get list of directories

public static List<String> getDir (String xmlindir) throws IOException {
      String[] directories = null;
      try {
        File file = new File(xmlindir);
         directories = file.list(new FilenameFilter() {
         public boolean accept(File current, String name) {
          return new File(current, name).isDirectory();
        }
     });
     System.out.println(Arrays.toString(directories));
    } catch (Exception e) {
       e.printStackTrace();
      }
    return Arrays.asList(directories);
}

3. get general file attributes (could be useful for sorting and logging)

public static void getFileAttr(String file){
   try{
      Path path = Paths.get(file);
      BasicFileAttributes bfa = Files.readAttributes(path, BasicFileAttributes.class);

      System.out.println("Creation Time : " + bfa.creationTime());
      System.out.println("Last Access Time : " + bfa.lastAccessTime());
      System.out.println("Last Modified Time : " + bfa.lastModifiedTime());
      System.out.println("Is Directory : " + bfa.isDirectory());
      System.out.println("Is Other : " + bfa.isOther());
      System.out.println("Is Regular File : " + bfa.isRegularFile());
      System.out.println("Is Symbolic Link : " + bfa.isSymbolicLink());
      System.out.println("Size : " + bfa.size());
         } catch (Exception e) {
            e.printStackTrace();
         }
  }

4. delete file

   public static int DeleteFile(String fullpathtoparse){
         int status = 1;
         File parsedfile;
         logger.info("Deleting file: " + fullpathtoparse);
         try{
               parsedfile = new File(fullpathtoparse + Config.STAT_FFLOG);
               if(parsedfile.delete()){
                     status = 0;
                     System.out.println("Deleted OK " + fullpathtoparse);
               }else{
                     status = 1;
                     System.out.println("problem with deleting file");
               }
            } catch (Exception e) {
               logger.severe("Unable delete file: " + e.getMessage());
               e.printStackTrace();
            }
            return status;
      }

5. check the file presence (in this case - lock file, checking is directory already locked" by other parser instance)

   public boolean FileExists (String stat_dir) throws IOException {
         boolean locked = false;
         String lockfile = stat_dir+ Config.STAT_FFLOG + ".lck";
            try{
               File file = new File(lockfile);
               locked = file.exists();
               if (file.exists() && file.isFile()) {
                           locked = true;
                     }else{
                           locked = false;
                     }
                 } catch (Exception e) {
                       e.printStackTrace();
                 }
                 return locked;
}

Obviously, parsing starts from obtaining the file list from a certain directory:

      try {
            //check for lock - currently running parser for this directory
                 if (!FileExists(dirtoparse)){
            // get filelist from this directory
                 List<String> statdatafiles = filehandler.getSTATFilesfromDir(dirtoparse);
                 if(statdatafiles.size() > 0) {
                 logger.info("Identified " + statdatafiles.size() + " new XML files. Parsing individually");
            // main loop
                for (String filename : statdatafiles) {
                      logger.info("Parsing new XML file : " + filename);
                      .....
                     //call parser, check the status

The first issue to be solved is the maintenance of the processing history, to avoid double re-processing and multiplying data in the analytics DB. For instance, what if there is no write access to the DB for maintaining runtime log with a list of processed files, because the product owner doesn’t want to maintain it and keep up with housekeeping jobs? What if we cannot even delete or archive processed files? File processing requirements for two parsers are different:

  1. If FF files are in the same directory, subdirectories traversing is not required. A processed file cannot be renamed or moved. Filenames are incremental, sortable and comparable. 
  2. XML files are dispersed over many subdirectories, and constantly updated. Filenames are random and not incremental. A processed file can be moved to the Archive directory.

For unit test simplification and unification, the following folder structure can be offered.

On JIT and Production, the folder structure will be defined by Client (TradingPartner) and stored in a config *. property file, but for now all inbound XML files will be in <TradingPartner>/In and its subdirectories, while also archived in /Archive. FF-files will be in /In only.

As long as all FF-files have sequential and incremental naming, the last successfully processed file can be stored in a simple property file with two parameters, one for the filename and another for the directory, so the new Filelist (constructed by function 2 from above ) for parsing in next scanning session will be built from the next file in line. For recording these parameters the setParams() method is created in a Config utility class. These parameters are acquired using getLastParsedFile every time the list is created.

Property reading

            Properties prop = new Properties();
            String propfile = STAT_FFTRACE;
            File fp = new File(curdir+propfile);
            String fname = null;
            try {
                  prop.load(new FileInputStream(fp));
                  // get the property's value
                  fname = prop.getProperty("last_processed_file");
            }catch (FileNotFoundException e){
      .....

Property storing

            Properties prop = new Properties();
            String propfile = STAT_FFTRACE;
            File fp = new File(curdir+propfile);
            FileOutputStream fileOut = new FileOutputStream(fp);
            try {
                  // set the property's value
                  prop.setProperty("last_processed_file", filename);
                  prop.setProperty("last_processed_directory", curdir);
                  // save properties to project root folder
                  prop.store(fileOut, null);
            } catch (IOException io) {
      ...

Similar functionality is used for reading and assigning global runtime parameters from the main configuration file (managed in an assembly XMLDBExporter project), whose structure will be discussed later (in part 2: Docker&Vagrant). To complete the parsing management part, concurrency issue shall be addressed first. Running more than one instance of exporter in the same directory shall be avoided. Some lock mechanism must be used and if a lock flag is set, the new instance will not start. Here a Java Logger API could help.

The proposed logging strategy is one log per exporting file. In case of an error, the directory parsing shall be aborted, and the last processed filename is preserved in a property file (for ff-exporter), or moved to the Archive folder (xml-exporter). When a log file is created, an additional *.lck file will be maintained by Logger API, to prevent a second instance to occur, and a directory scanner should check for the presence <logfilename>.log.lck in the current folder, where logfilename is set in the global property file, see the code snippet for Boolean function FileExists (5) .
The logger class is in utility package and have two main methods.

First, the logger setup (.lck will be created when parsing starts, and the directory is locked):

static public void setup(String dirtoparse) throws IOException {
      // get the global logger to configure it
      Logger logger = Logger.getLogger(Logger.GLOBAL_LOGGER_NAME);
      // suppress the logging output to the console
      Logger rootLogger = Logger.getLogger("");
      Handler[] handlers = rootLogger.getHandlers();
      if (handlers[0] instanceof ConsoleHandler) {
        rootLogger.removeHandler(handlers[0]);
      }
      //set the logging level from Config file
       If(Config.LOGGING_LEVEL ="3")
         logger.setLevel(Level.INFO);
      .......

      fileTxt = new FileHandler(dirtoparse.trim()+Config.XMLLOG);
      formatterTxt = new SimpleFormatter();
      fileTxt.setFormatter(formatterTxt);
      logger.addHandler(fileTxt);
      }

And the Logger context Destroyer (right after completing parsing the file(s), in order to remove locks)

public static void contextDestroyed(Logger logger) {
            //get all handlers
            Handler[] handlers = logger.getHandlers();
            //remove all handlers
            for(int i = 0; i < handlers.length; i++)
            {
               logger.removeHandler(handlers[i]);
               handlers[i].close();
            }
            logger = null;
          }

DBO Operations

1. Enhansing XML Object with MySQL DB data

After constructing the object, the XML parser should update newly constructed object with the data from MySQL DB using some acquired elements as a keys. This decision was discussed in the first part, where some design patterns suggested for improving reliability of this approach.

First, the driver must be loaded, using property from config file and Java Reflection is employed again

try {
         Class.forName(Config.JDBC_DRIVER).newInstance();
         System.out.println("MySQL Driver obtained; ");
      } catch (Exception ex) {
            System.out.println("Failed to obtained MySQL Driver; ");
            ex.printStackTrace();
      }

Afterwards, the connect() method shall be implemented as shown below (main getConnection() first)

Connection connection =null;
.....
LoadDriver();
connection = DriverManager
            .getConnection(Config.PROJECT_DB_URL +Config.DB_NAME +
                  "?user="+Config.UserName+ "&password="+Config.Password);

Now using the Failsafe library and lambda expressions, we can implement some retry policy

RetryPolicy retryPolicy = new RetryPolicy()
   .retryOn(Exception.class)
   .withDelay(1, TimeUnit.SECONDS)
   .withMaxRetries(3);

........

public Connection getConnection() {
         Connection connection =null;
         try {
            connection = Failsafe.with(retryPolicy).get(() -> connect());
     } catch (Exception ex) {
            LOGGER.severe("Failed to obtained MySQL Driver; ");
            ex.printStackTrace();
     }
   return connection;
}

2. Posting parsed object to ES DB

The final stage, according to the logic depicted in the sequence diagram at the top of the document, is sending the constructed object to ES DB as JSON over HTTP. Here we will use HTTP ES API, port 9200.

es_url = Config.ES_LOCALHTTP_URL + es_index;
HttpParams httpParams = new BasicHttpParams();
HttpProtocolParams.setContentCharset(httpParams, HTTP.UTF_8);
HttpProtocolParams.setHttpElementCharset(httpParams, HTTP.UTF_8);
HttpClient client = new DefaultHttpClient(httpParams);
HttpPost request = new HttpPost(es_url);
StringEntity str = null;
Gson gson = new Gson();
String jsonString = gson.toJson(json);

request.setEntity(new StringEntity(jsonString, HTTP.UTF_8));
request.setHeader("Accept", "application/json");
request.setHeader("Content-type", "application/json");
HttpResponse response = client.execute(request);

if (response.getStatusLine().getStatusCode() != 201) {
      throw new RuntimeException("Failed : HTTP error code : "
         + response.getStatusLine().getStatusCode());
}else{
              status = 0;
}

Make sure that right encoding is used for the HTTP protocol parameters. This is not exactly the same encoding we use for XML file parsing. Furthermore, bear in mind that in this set of projects we have to maintain three encoding parameters. Two are mentioned above, and the third is the parameter in the maven pom header. The third one is very important, otherwise will project will not compile with the current Nordic character parsing. For better traceability and control, the last action after HTTPPost should be the parsing of a response.

BufferedReader br = new BufferedReader(
      new InputStreamReader((response.getEntity().getContent())));
         String output;
               System.out.println("Output from Server .... \n"); //in production use logger
               while ((output = br.readLine()) != null) {
                  System.out.println(output); //for unit test only, verification of server response
               }

Scheduler

To support all functionality as described above, the task execution scheduling shall be established. The starting point of all file adapters is a directory scanner building the file list to parse, and thus this task is the subject of automation. As all adapters are standalone command-line utility applications, we cannot use java.ejb.Schedules. The Java ScheduledExecutorService seems to be a suitable alternative, but it has been so badly implemented that it ignited a lot of heated articles like [10]. Despite of some quite tough language used, this article is indeed useful and should be used for mission-critical implementations. A bit more simplified approach is employed.
Firstly, class should implement a Runnable interface

public class FileHandler implements Runnable

Then, we will have a simple run method, such as

public void run() {
...
   try {
      filehandler.parseData(null);
   } catch (Exception e) {
      e.printStackTrace();
   }
 }

To improve reliability and traceability of the run method, we can again employ the Failsafe library, now applying a Circuit Breaker pattern implementation in order to fail fast and not attempt to loop when something is critically wrong (no DB or ES connection, file corruption, etc.). The Circuit Breaker was implemented to execute Runnable or Callable (lambda expressions as for retry), but here we will use standalone syntax and convert run() as demonstrated below.

public class FileHandler implements Runnable
      CircuitBreaker breaker = new CircuitBreaker()
         .withFailureThreshold(3, 10)
         .withSuccessThreshold(5)
         .withDelay(1, TimeUnit.MINUTES);
……

public void run() {
      breaker.open();
      breaker.halfOpen();
      breaker.close();
      FileHandler filehandler = new FileHandler();
      if (breaker.allowsExecution()) {
         try {
            filehandler.parseData(null);
            breaker.recordSuccess();
         } catch (Exception e) {
            logger.severe("Critical error, cannot proceed with XML Data Import");
            breaker.recordFailure(e);
            e.printStackTrace();
         }
      }

The inbound directory URL is passed as null, forcing to use the one declared in the configuration property.

Note: Unfortunately, rigorous testing of preceding snippet with Circuit Breaker using several error conditions didn’t show much improvement in error transparency and resolution (as the scheduled task will die anyway, with no clear reason being provided back), as demonstrated in [10], so please listen to what man said regarding the Executor's improvement.

The main method actually implements the scheduling functionality. The property shall be read and initiated first.

   public static void main(String[] args) {
       Config.getParams();
      System.out.println("Scanning with frequency " + Config.XMLPARSER_TIMEOUT);
      Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new FileHandler(), 0, Config.XMLPARSER_TIMEOUT,
      TimeUnit.SECONDS);
   }

The interval is in seconds, as you can see from the preceding snippet. The parameter can be configurable. The exporter main method should be able to accept different set of parameters, as such:

  1. Two parameters - individual file and ES index to use. Good for unit testing;
  2. One parameter - single directory, suitable for test and prod modes;
  3. No parameters - production mode to run FileHandler using scheduler as explained above, all parameters from config file.

Now these components are ready for conternisation.

References

  1. http://blog.zenika.com/2014/10/07/Setting-up-a-development-environment-using-Docker-and-Vagrant/
  2. Microservice Design pattern: http://soapatterns.org/design_patterns/microservice 
  3. http://zeroturnaround.com/rebellabs/java-tools-and-technologies-landscape-2016/
  4. http://blog.osgi.org/2014/08/is-docker-eating-javas-lunch.html
  5. Principles of Service Design, Thomas Erl, 2007, Prentice Hall
  6. SOA. Concepts, Technology and Design, Thomas Erl, 2005, Prentice Hall
  7. Web Services Choreography Description Language Version 1.0 http://www.w3.org/TR/ws-cdl-10/#Purpose-of-WS-CDL
  8. XML to JSON https://github.com/stanfy/gson-xml
  9. https://developer.android.com/reference/org/xmlpull/v1/XmlPullParser.html
  10. http://code.nomad-labs.com/2011/12/09/mother-fk-the-scheduledexecutorservice/
  11. http://customerthink.com/the-what-and-where-of-big-data-a-data-definition-framework/
  12. https://sites.google.com/a/mammatustech.com/mammatusmain/reactive-microservices

 

Om bloggeren:
Sergey has a Master of Science degree in Engineering and is a Certified Trainer in SOA Architecture, Governance and Security. He has several publications in this field, including comprehensive practical guide “Applied SOA Patterns on the Oracle Platform”, Packt Publishing, 2014.

comments powered by Disqus