Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.11.0</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
Expand Down
76 changes: 76 additions & 0 deletions src/main/java/com/iemr/common/identity/ScheduledSyncJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.iemr.common.identity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.iemr.common.identity.data.elasticsearch.ElasticsearchSyncJob;
import com.iemr.common.identity.service.elasticsearch.SyncJobService;

/**
* Scheduled jobs for Elasticsearch sync
*
* To enable scheduled sync, set:
* elasticsearch.sync.scheduled.enabled=true in application.properties
*/
@Component
public class ScheduledSyncJob {

private static final Logger logger = LoggerFactory.getLogger(ScheduledSyncJob.class);

@Autowired
private SyncJobService syncJobService;

@Value("${elasticsearch.sync.scheduled.enabled:true}")
private boolean scheduledSyncEnabled;

/**
* Run full sync every day at 2 AM
* Cron: second, minute, hour, day, month, weekday
*/
@Scheduled(cron = "${elasticsearch.sync.scheduled.cron:0 0 2 * * ?}")
public void scheduledFullSync() {
if (!scheduledSyncEnabled) {
logger.debug("Scheduled sync is disabled");
return;
}

logger.info("========================================");
logger.info("Starting scheduled full sync job");
logger.info("========================================");

try {
// Check if there's already a sync running
if (syncJobService.isFullSyncRunning()) {
logger.warn("Full sync already running. Skipping scheduled sync.");
return;
}

// Start async sync
ElasticsearchSyncJob job = syncJobService.startFullSyncJob("SCHEDULER");
logger.info("Scheduled sync job started: jobId={}", job.getJobId());

} catch (Exception e) {
logger.error("Error starting scheduled sync: {}", e.getMessage(), e);
}
}

/**
* Clean up old completed jobs (keep last 30 days)
* Runs every Sunday at 3 AM
*/
@Scheduled(cron = "0 0 3 * * SUN")
public void cleanupOldJobs() {
if (!scheduledSyncEnabled) {
return;
}

logger.info("Running cleanup of old sync jobs...");

// TODO: Implement cleanup logic
// Delete jobs older than 30 days with status COMPLETED or FAILED
}
}
100 changes: 100 additions & 0 deletions src/main/java/com/iemr/common/identity/config/ElasticsearchConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.iemr.common.identity.config;

import java.io.IOException;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticsearchConfig {

private static final Logger logger = LoggerFactory.getLogger(ElasticsearchConfig.class);

@Value("${elasticsearch.host}")
private String esHost;

@Value("${elasticsearch.port}")
private int esPort;

@Value("${elasticsearch.username}")
private String esUsername;

@Value("${elasticsearch.password}")
private String esPassword;

@Value("${elasticsearch.index.beneficiary}")
private String indexName;

@Bean
public ElasticsearchClient elasticsearchClient() {
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(esUsername, esPassword)
);

RestClient restClient = RestClient.builder(
new HttpHost(esHost, esPort, "http")
).setHttpClientConfigCallback(httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
).build();

ElasticsearchTransport transport = new RestClientTransport(
restClient,
new JacksonJsonpMapper()
);

return new ElasticsearchClient(transport);
}

@Bean
public Boolean createIndexMapping(ElasticsearchClient client) throws IOException {

// Check if index exists
boolean exists = client.indices().exists(e -> e.index(indexName)).value();

if (!exists) {
client.indices().create(c -> c
.index(indexName)
.mappings(m -> m
.properties("beneficiaryRegID", p -> p.keyword(k -> k))
.properties("firstName", p -> p.text(t -> t
.fields("keyword", f -> f.keyword(k -> k))
.analyzer("standard")
))
.properties("lastName", p -> p.text(t -> t
.fields("keyword", f -> f.keyword(k -> k))
.analyzer("standard")
))
.properties("phoneNum", p -> p.keyword(k -> k))
.properties("fatherName", p -> p.text(t -> t.analyzer("standard")))
.properties("spouseName", p -> p.text(t -> t.analyzer("standard")))
.properties("aadharNo", p -> p.keyword(k -> k))
.properties("govtIdentityNo", p -> p.keyword(k -> k))
)
.settings(s -> s
.numberOfShards("3")
.numberOfReplicas("1")
.refreshInterval(t -> t.time("1s"))
)
);

logger.info("Created Elasticsearch index with proper mappings");
}
return true;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.iemr.common.identity.config;

import java.util.concurrent.Executor;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
* Configuration for async processing and scheduling
*/
@Configuration
@EnableAsync
@EnableScheduling
public class ElasticsearchSyncConfig {

/**
* Thread pool for Elasticsearch sync operations
* Configured for long-running background jobs
*/
@Bean(name = "elasticsearchSyncExecutor")
public Executor elasticsearchSyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

// Only 1-2 sync jobs should run at a time to avoid overwhelming DB/ES
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("es-sync-");
executor.setKeepAliveSeconds(60);

// Handle rejected tasks
executor.setRejectedExecutionHandler((r, executor1) -> {
throw new RuntimeException("Elasticsearch sync queue is full. Please wait for current job to complete.");
});

executor.initialize();
return executor;
}

/**
* General purpose async executor
*/
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("async-");
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public String getBeneficiaries(
JsonElement json = JsonParser.parseString(searchFilter);
IdentitySearchDTO searchParams = InputMapper.getInstance().gson().fromJson(json, IdentitySearchDTO.class);

List<BeneficiariesDTO> list = svc.getBeneficiaries(searchParams);
List<BeneficiariesDTO> list = svc.getBeneficiarieswithES(searchParams);
list.removeIf(Objects::isNull);
Collections.sort(list);
response = getSuccessResponseString(list, 200, "success", "getBeneficiariesByAdvanceSearch");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.iemr.common.identity.controller;


import java.util.HashMap;
import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import com.iemr.common.identity.service.elasticsearch.ElasticsearchService;
import com.iemr.common.identity.utils.CookieUtil;
import com.iemr.common.identity.utils.JwtUtil;

import jakarta.servlet.http.HttpServletRequest;

/**
* Elasticsearch-enabled Beneficiary Search Controller
* All search endpoints with ES support
*/
@RestController
@RequestMapping("/beneficiary")
public class IdentityESController {

private static final Logger logger = LoggerFactory.getLogger(IdentityESController.class);

@Autowired
private ElasticsearchService elasticsearchService;

@Autowired
private JwtUtil jwtUtil;

/**
* MAIN UNIVERSAL SEARCH ENDPOINT
* Searches across all fields - name, phone, ID, etc.
*
* Usage: GET /beneficiary/search?query=vani
* Usage: GET /beneficiary/search?query=9876543210
*/
@GetMapping("/search")
public ResponseEntity<Map<String, Object>> search(@RequestParam String query, HttpServletRequest request) {
try {
String jwtToken = CookieUtil.getJwtTokenFromCookie(request);
String userId = jwtUtil.getUserIdFromToken(jwtToken);
int userID=Integer.parseInt(userId);
List<Map<String, Object>> results = elasticsearchService.universalSearch(query, userID);

Map<String, Object> response = new HashMap<>();
response.put("data", results);
response.put("statusCode", 200);
response.put("errorMessage", "Success");
response.put("status", "Success");

return ResponseEntity.ok(response);

} catch (Exception e) {
Map<String, Object> errorResponse = new HashMap<>();
errorResponse.put("data", new ArrayList<>());
errorResponse.put("statusCode", 500);
errorResponse.put("errorMessage", e.getMessage());
errorResponse.put("status", "Error");

return ResponseEntity.status(500).body(errorResponse);
}
}


}
Loading
Loading