spring 集群环境中运行的Spring Scheduled Task
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/31288810/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Spring Scheduled Task running in clustered environment
提问by user3131879
I am writing an application that has a cron job that executes every 60 seconds. The application is configured to scale when required onto multiple instances. I only want to execute the task on 1 instance every 60 seconds (On any node). Out of the box I can not find a solution to this and I am surprised it has not been asked multiple times before. I am using Spring 4.1.6.
我正在编写一个应用程序,它有一个每 60 秒执行一次的 cron 作业。应用程序配置为在需要时扩展到多个实例。我只想每 60 秒(在任何节点上)在 1 个实例上执行任务。开箱即用,我找不到解决方案,我很惊讶之前没有多次询问它。我正在使用 Spring 4.1.6。
<task:scheduled-tasks>
<task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
采纳答案by manish
Batch and scheduled jobs are typically run on their own standalone servers, away from customer facing apps so it is not a common requirement to include a job in an application that is expected to run on a cluster. Additionally, jobs in clustered environments typically do not need to worry about other instances of the same job running in parallel so another reason why isolation of job instances is not a big requirement.
批处理和计划作业通常在它们自己的独立服务器上运行,远离面向客户的应用程序,因此通常不要求将作业包含在预期在集群上运行的应用程序中。此外,集群环境中的作业通常不需要担心并行运行的同一作业的其他实例,因此隔离作业实例的另一个原因不是很大的要求。
A simple solution would be to configure your jobs inside a Spring Profile. For example, if your current configuration is:
一个简单的解决方案是在 Spring Profile 中配置您的作业。例如,如果您当前的配置是:
<beans>
<bean id="someBean" .../>
<task:scheduled-tasks>
<task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
</beans>
change it to:
将其更改为:
<beans>
<beans profile="scheduled">
<bean id="someBean" .../>
<task:scheduled-tasks>
<task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
</beans>
</beans>
Then, launch your application on just one machine with the scheduled
profile activated (-Dspring.profiles.active=scheduled
).
然后,仅在一台已scheduled
激活配置文件的机器上启动您的应用程序( -Dspring.profiles.active=scheduled
)。
If the primary server becomes unavailable for some reason, just launch another server with the profile enabled and things will continue to work just fine.
如果主服务器由于某种原因变得不可用,只需启动另一台启用了配置文件的服务器,事情就会继续正常工作。
Things change if you want automatic failover for the jobs as well. Then, you will need to keep the job running on all servers and check synchronization through a common resource such as a database table, a clustered cache, a JMX variable, etc.
如果您还希望作业自动进行故障转移,情况就会发生变化。然后,您需要保持作业在所有服务器上运行,并通过公共资源(例如数据库表、集群缓存、JMX 变量等)检查同步。
回答by Lukas
There is a ShedLockproject that serves exactly this purpose. You just annotate tasks which should be locked when executed
有一个ShedLock项目正是为了这个目的。您只需注释执行时应锁定的任务
@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
// do something
}
Configure Spring and a LockProvider
配置 Spring 和 LockProvider
@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "30s")
class MySpringConfiguration {
...
@Bean
public LockProvider lockProvider(DataSource dataSource) {
return new JdbcTemplateLockProvider(dataSource);
}
...
}
回答by alexanoid
I think you have to use Quartz Clustering with JDBC-JobStorefor this purpose
我认为您必须为此目的使用Quartz Clustering 和 JDBC-JobStore
回答by mspapant
The is another simple and robust way to safe execute a job in a cluster. You can based on database and execute the task only if the node is the "leader" in the cluster.
这是在集群中安全执行作业的另一种简单而强大的方法。只有当节点是集群中的“领导者”时,才能基于数据库执行任务。
Also when a node is failed or shutdown in the cluster another node became the leader.
同样,当集群中的一个节点出现故障或关闭时,另一个节点将成为领导者。
All you have is to create a "leader election" mechanism and every time to check if your are the leader:
你所要做的就是创建一个“leader选举”机制,每次检查你是否是leader:
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
if (checkIfLeader()) {
final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
for (EmailTask emailTask : list) {
dispatchService.sendEmail(emailTask);
}
}
}
Follow those steps:
请按照以下步骤操作:
1.Define the object and table that holds one entry per node in the cluster:
1.定义集群中每个节点保存一个条目的对象和表:
@Entity(name = "SYS_NODE")
public class SystemNode {
/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;
/** The ip. */
@Column(name = "IP")
private String ip;
/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;
/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();
/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;
public Long getId() {
return id;
}
public void setId(final Long id) {
this.id = id;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(final String timestamp) {
this.timestamp = timestamp;
}
public String getIp() {
return ip;
}
public void setIp(final String ip) {
this.ip = ip;
}
public Date getLastPing() {
return lastPing;
}
public void setLastPing(final Date lastPing) {
this.lastPing = lastPing;
}
public Date getCreatedAt() {
return createdAt;
}
public void setCreatedAt(final Date createdAt) {
this.createdAt = createdAt;
}
public Boolean getIsLeader() {
return isLeader;
}
public void setIsLeader(final Boolean isLeader) {
this.isLeader = isLeader;
}
@Override
public String toString() {
return "SystemNode{" +
"id=" + id +
", timestamp='" + timestamp + '\'' +
", ip='" + ip + '\'' +
", lastPing=" + lastPing +
", createdAt=" + createdAt +
", isLeader=" + isLeader +
'}';
}
}
}
2.Create the service that a) insert the node in database , b) check for leader
2.创建服务a)在数据库中插入节点,b)检查leader
@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener {
/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);
/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";
/** The ip. */
private String ip;
/** The system service. */
private SystemService systemService;
/** The system node repository. */
private SystemNodeRepository systemNodeRepository;
@Autowired
public void setSystemService(final SystemService systemService) {
this.systemService = systemService;
}
@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
this.systemNodeRepository = systemNodeRepository;
}
@Override
public void pingNode() {
final SystemNode node = systemNodeRepository.findByIp(ip);
if (node == null) {
createNode();
} else {
updateNode(node);
}
}
@Override
public void checkLeaderShip() {
final List<SystemNode> allList = systemNodeRepository.findAll();
final List<SystemNode> aliveList = filterAliveNodes(allList);
SystemNode leader = findLeader(allList);
if (leader != null && aliveList.contains(leader)) {
setLeaderFlag(allList, Boolean.FALSE);
leader.setIsLeader(Boolean.TRUE);
systemNodeRepository.save(allList);
} else {
final SystemNode node = findMinNode(aliveList);
setLeaderFlag(allList, Boolean.FALSE);
node.setIsLeader(Boolean.TRUE);
systemNodeRepository.save(allList);
}
}
/**
* Returns the leaded
* @param list
* the list
* @return the leader
*/
private SystemNode findLeader(final List<SystemNode> list) {
for (SystemNode systemNode : list) {
if (systemNode.getIsLeader()) {
return systemNode;
}
}
return null;
}
@Override
public boolean isLeader() {
final SystemNode node = systemNodeRepository.findByIp(ip);
return node != null && node.getIsLeader();
}
@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (applicationEvent instanceof ContextRefreshedEvent) {
pingNode();
}
}
/**
* Creates the node
*/
private void createNode() {
final SystemNode node = new SystemNode();
node.setIp(ip);
node.setTimestamp(String.valueOf(System.currentTimeMillis()));
node.setCreatedAt(new Date());
node.setLastPing(new Date());
node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
systemNodeRepository.save(node);
}
/**
* Updates the node
*/
private void updateNode(final SystemNode node) {
node.setLastPing(new Date());
systemNodeRepository.save(node);
}
/**
* Returns the alive nodes.
*
* @param list
* the list
* @return the alive nodes
*/
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
final List<SystemNode> finalList = new LinkedList<>();
for (SystemNode systemNode : list) {
if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
finalList.add(systemNode);
}
}
if (CollectionUtils.isEmpty(finalList)) {
LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
}
return finalList;
}
/**
* Finds the min name node.
*
* @param list
* the list
* @return the min node
*/
private SystemNode findMinNode(final List<SystemNode> list) {
SystemNode min = list.get(0);
for (SystemNode systemNode : list) {
if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
min = systemNode;
}
}
return min;
}
/**
* Sets the leader flag.
*
* @param list
* the list
* @param value
* the value
*/
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
for (SystemNode systemNode : list) {
systemNode.setIsLeader(value);
}
}
}
}
3.ping the database to send that your are alive
3.ping数据库发送你还活着
@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
systemNodeService.pingNode();
}
@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
systemNodeService.checkLeaderShip();
}
4.you are ready! Just check if you are the leader before execute the task:
4.你准备好了!在执行任务之前检查你是否是领导者:
@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
if (checkIfLeader()) {
final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
for (EmailTask emailTask : list) {
dispatchService.sendEmail(emailTask);
}
}
}
回答by Will Hughes
dlockis designed to run tasks only once by using database indexes and constraints. You can simply do something like below.
dlock旨在通过使用数据库索引和约束仅运行一次任务。您可以简单地执行以下操作。
@Scheduled(cron = "30 30 3 * * *")
@TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
public void execute() {
}
See the articleabout using it.
请参阅有关使用它的文章。
回答by RenRen
I'm using a database table to do the locking.Only one task at a time can do a insert to the table. The other one will get a DuplicateKeyException. The insert and delete logic is handeld by an aspectaround the @Scheduled annotation. I'm using Spring Boot 2.0
我正在使用数据库表来进行锁定。一次只有一项任务可以对表执行插入操作。另一个将得到 DuplicateKeyException。插入和删除逻辑由@Scheduled 注释周围的方面处理。我正在使用 Spring Boot 2.0
@Component
@Aspect
public class SchedulerLock {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);
@Autowired
private JdbcTemplate jdbcTemplate;
@Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {
String jobSignature = joinPoint.getSignature().toString();
try {
jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});
Object proceed = joinPoint.proceed();
jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
return proceed;
}catch (DuplicateKeyException e) {
LOGGER.warn("Job is currently locked: "+jobSignature);
return null;
}
}
}
@Component
public class EveryTenSecondJob {
@Scheduled(cron = "0/10 * * * * *")
public void taskExecution() {
System.out.println("Hello World");
}
}
CREATE TABLE scheduler_lock(
signature varchar(255) NOT NULL,
date datetime DEFAULT NULL,
PRIMARY KEY(signature)
);
回答by Gustav Karlsson
You could use an embeddable scheduler like db-schedulerto accomplish this. It has persistent executions and uses a simple optimistic locking mechanism to guarantee execution by a single node.
您可以使用像db-scheduler这样的可嵌入调度程序来完成此操作。它具有持久执行并使用简单的乐观锁定机制来保证由单个节点执行。
Example code for how the use-case can be achieved:
如何实现用例的示例代码:
RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
.execute((taskInstance, executionContext) -> {
System.out.println("Executing " + taskInstance.getTaskAndInstance());
});
final Scheduler scheduler = Scheduler
.create(dataSource)
.startTasks(recurring1)
.build();
scheduler.start();