diff --git a/client/src/components/Form/Form.jsx b/client/src/components/Form/Form.jsx index c629280c0..ea032347e 100644 --- a/client/src/components/Form/Form.jsx +++ b/client/src/components/Form/Form.jsx @@ -142,7 +142,7 @@ class Form extends Root { ); }; - renderSelect = (name, label, items, onChange, selectClass, wrapperClass, rest) => { + renderSelect = (name, label, items, onChange, selectClass, wrapperClass, blankItem, rest) => { const { formData, errors } = this.state; return ( @@ -157,6 +157,7 @@ class Form extends Root { }} selectClass={selectClass} wrapperClass={wrapperClass} + blankItem={blankItem} {...rest} /> ); diff --git a/client/src/components/Form/Select/Select.jsx b/client/src/components/Form/Select/Select.jsx index 53239b169..8279ec5d1 100644 --- a/client/src/components/Form/Select/Select.jsx +++ b/client/src/components/Form/Select/Select.jsx @@ -1,6 +1,6 @@ import React from 'react'; -const Select = ({ name, label, items, error, wrapperClass, selectClass, ...rest }) => { +const Select = ({ name, label, items, error, wrapperClass, selectClass, blankItem, ...rest }) => { let wrapperClassRender = 'form-group'; let selectClassRender = 'col-xs-10'; if (wrapperClass) { @@ -21,6 +21,11 @@ const Select = ({ name, label, items, error, wrapperClass, selectClass, ...rest )}
+ + } +
+ + + {this.renderInput( + name, + `Partition: ${partition.id}`, + 'Offset', + 'number', + undefined, + true, + 'partition-input-div', + `partition-input ${name}-input` + )} + +
+
+ ); + }); + + return renderedInputs; + }; + + unCheckAll = (value) => { + const {checked} = this.state; + + Object.keys(checked).forEach(name => { + checked[name] = value; + }); + + this.setState({ checked}); + } + + resetToFirstOffsets = () => { + const { selectedTopic, formData } = this.state; + + selectedTopic.partitions.forEach(partition => { + const name = `partition-${partition.id}`; + formData[name] = partition.firstOffset || '0'; + }); + + this.setState({ formData }); + }; + + resetToLastOffsets = () => { + const { selectedTopic, formData } = this.state; + + selectedTopic.partitions.forEach(partition => { + const name = `partition-${partition.id}`; + formData[name] = partition.lastOffset || '0'; + }); + + this.setState({ formData }); + }; + + resetToCalculatedOffsets = ({ currentTarget: input }) => { + const { selectedTopic, formData } = this.state; + + selectedTopic.partitions.forEach(partition => { + const name = `partition-${partition.id}`; + const calculatedOffset = (partition.lastOffset || 0) - input.value; + formData[name] = (!calculatedOffset || calculatedOffset < 0 )? '0' : calculatedOffset; + }); + + formData['lastMessagesNr'] = input.value; + this.setState({ formData }); + }; + + async getTopicOffset() { + const { clusterId, topicId, timestamp} = this.state; + const momentValue = moment(timestamp); + + const date = + timestamp.toString().length > 0 + ? formatDateTime( + { + year: momentValue.year(), + monthValue: momentValue.month(), + dayOfMonth: momentValue.date(), + hour: momentValue.hour(), + minute: momentValue.minute(), + second: momentValue.second(), + milli: momentValue.millisecond() + }, + 'YYYY-MM-DDTHH:mm:ss.SSS' + ) + 'Z' + : ''; + + let data = {}; + if (date !== '') { + data = await this.getApi(uriTopicsOffsetsByTimestamp(clusterId, topicId, date)); + data = data.data; + this.handleOffsetsByTimestamp(data); + } + } + + handleOffsetsByTimestamp = partitions => { + const { formData } = this.state; + partitions.forEach(partition => { + const name = `partition-${partition.partition}`; + formData[name] = partition.offset || '0'; + }); + this.setState({ formData }); + }; + + renderResetButton = () => { + const { timestamp, formData} = this.state; + const { loading } = this.props.history.location; + + return ( + + +
this.unCheckAll(true)} + > + Check all +
+
this.unCheckAll(false)} + > + Uncheck all +
+
this.resetToFirstOffsets()} + > + Reset to first offsets +
+
this.resetToLastOffsets()} + > + Reset to last offsets +
+
+ + Filter datetime + {!loading && ( + +
+ { + this.setState({ timestamp: value }, () => this.getTopicOffset()); + }} + /> +
+
+ )} +
+ +
+ +
+ + Last x messages per partition + {!loading && ( + +
+ +
+
+ )} +
+ +
+
+ ); + }; + + + render() { + const { clusterId, topicId, clusters, topics } = this.state; + + return ( +
+
+
this.handleSubmit()} + > + {this.renderTopicPartition()} +
+ Target Topic + + {this.renderSelect( + 'clusterListView', + 'Cluster', + clusters, + ({ currentTarget: input }) => { + const { formData } = this.state; + formData.clusterListView = input.value; + this.setState({formData}); + this.getTopics(input.value); + }, + 'col-sm-10', + 'select-wrapper select-wrapper-copy', + false, + { className: 'form-control' } + )} + + {this.renderSelect( + 'topicListView', + 'Topic', + topics, + ({ currentTarget: input }) => { + const { formData } = this.state; + formData.topicListView = input.value; + this.setState({formData}); + + }, + 'col-sm-10', + 'select-wrapper select-wrapper-copy', + true, + { className: 'form-control' } + )} +
+ + {this.renderButton( + 'Copy', + this.handleSubmit, + undefined, + 'submit', + this.renderResetButton() + )} +
+
+ ); + } +} + +export default TopicCopy; diff --git a/client/src/containers/Topic/TopicCopy/index.js b/client/src/containers/Topic/TopicCopy/index.js new file mode 100644 index 000000000..6650f2d57 --- /dev/null +++ b/client/src/containers/Topic/TopicCopy/index.js @@ -0,0 +1,3 @@ +import TopicCopy from './TopicCopy'; + +export default TopicCopy; diff --git a/client/src/containers/Topic/TopicCopy/styles.scss b/client/src/containers/Topic/TopicCopy/styles.scss new file mode 100644 index 000000000..a1b56d242 --- /dev/null +++ b/client/src/containers/Topic/TopicCopy/styles.scss @@ -0,0 +1,21 @@ +.filter-datetime { + width: 100%; + display: block; + margin: auto; + overflow: hidden; + padding-left: 1.5rem; +} + +.row-checkbox { + margin-left: 0; + align-items: baseline; +} + +.select-wrapper-copy { + width: 80%; + margin-bottom: 1rem; +} + +.input-nr-messages { + margin-left: 3px; +} \ No newline at end of file diff --git a/client/src/utils/Routes.js b/client/src/utils/Routes.js index ce3a578fb..1eeceae12 100644 --- a/client/src/utils/Routes.js +++ b/client/src/utils/Routes.js @@ -13,6 +13,7 @@ import ConnectCreate from '../containers/Connect/ConnectCreate/ConnectCreate'; import Connect from '../containers/Connect/ConnectDetail/Connect'; import TopicCreate from '../containers/Topic/TopicCreate/TopicCreate'; import TopicProduce from '../containers/Topic/TopicProduce'; +import TopicCopy from '../containers/Topic/TopicCopy'; import Loading from '../containers/Loading'; import ConsumerGroupList from '../containers/ConsumerGroup/ConsumerGroupList'; import ConsumerGroup from '../containers/ConsumerGroup/ConsumerGroupDetail'; @@ -145,6 +146,11 @@ class Routes extends Root { component={TopicProduce} /> )} + + {roles && roles.topic && roles.topic['topic/data/insert'] && ( + + )} + {roles && roles.topic && roles.topic['topic/read'] && ( )} diff --git a/client/src/utils/converters.js b/client/src/utils/converters.js index fb4d5fd7c..a3e7e6a69 100644 --- a/client/src/utils/converters.js +++ b/client/src/utils/converters.js @@ -139,4 +139,22 @@ export function organizeRoles(roles) { return JSON.stringify(newRoles); } +export function transformListObjsToViewOptions(list, id, name) { + return list.map(elem => { + return { + _id: elem[id], + name: elem[name] + }; + }); +} + +export function transformStringArrayToViewOptions(list) { + return list.map(elem => { + return { + _id: elem, + name: elem + }; + }); +} + export default { showTime, showBytes }; diff --git a/client/src/utils/endpoints.js b/client/src/utils/endpoints.js index db3516e0f..7d366feec 100644 --- a/client/src/utils/endpoints.js +++ b/client/src/utils/endpoints.js @@ -35,6 +35,10 @@ export const uriTopics = (clusterId, search, show, page) => { return `${apiUrl}/${clusterId}/topic?search=${search}&show=${show}&page=${page}`; }; +export const uriTopicsName = (clusterId) => `${apiUrl}/${clusterId}/topic/name`; + +export const uriTopicsInfo = (clusterId, topicId) => `${apiUrl}/${clusterId}/topic/${topicId}`; + export const uriTopicsCreate = clusterId => `${apiUrl}/${clusterId}/topic`; export const uriTopicsProduce = (clusterId, topicName) => @@ -92,6 +96,15 @@ export const uriTopicsUpdateConfigs = (clusterId, topicId) => { return `${apiUrl}/${clusterId}/topic/${topicId}/configs`; }; +export const uriTopicsOffsetsByTimestamp = (clusterId, topicId, timestamp) => { + return `${apiUrl}/${clusterId}/topic/${topicId}/offsets/start?timestamp=${timestamp}`; +}; + +export const uriTopicsCopy = (fromClusterId, fromTopicId, toClusterId, toTopicId) => { + return `${apiUrl}/${fromClusterId}/topic/${fromTopicId}/copy/${toClusterId}/topic/${toTopicId}`; +} + + export const uriConnects = id => { return `${apiUrl}/connects${id ? '?clusterId=' + id : ''}`; }; diff --git a/src/main/java/org/akhq/controllers/TopicController.java b/src/main/java/org/akhq/controllers/TopicController.java index 1cd5264ae..f2c060c1f 100644 --- a/src/main/java/org/akhq/controllers/TopicController.java +++ b/src/main/java/org/akhq/controllers/TopicController.java @@ -4,13 +4,17 @@ import com.google.common.collect.ImmutableMap; import io.micronaut.context.annotation.Value; import io.micronaut.context.env.Environment; +import io.micronaut.core.util.CollectionUtils; +import io.micronaut.core.util.StringUtils; import io.micronaut.http.HttpRequest; import io.micronaut.http.HttpResponse; import io.micronaut.http.MediaType; +import io.micronaut.http.annotation.Body; import io.micronaut.http.annotation.Controller; import io.micronaut.http.annotation.Delete; import io.micronaut.http.annotation.Get; import io.micronaut.http.annotation.Post; +import io.micronaut.http.annotation.QueryValue; import io.micronaut.http.sse.Event; import io.micronaut.security.annotation.Secured; import io.swagger.v3.oas.annotations.Operation; @@ -24,8 +28,11 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import javax.inject.Inject; + +import lombok.AllArgsConstructor; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.akhq.configs.Role; @@ -36,6 +43,7 @@ import org.akhq.models.Partition; import org.akhq.models.Record; import org.akhq.models.Topic; +import org.akhq.models.TopicPartition; import org.akhq.modules.AbstractKafkaWrapper; import org.akhq.repositories.AccessControlListRepository; import org.akhq.repositories.ConfigRepository; @@ -102,6 +110,17 @@ public ResultPagedList list( )); } + @Get("api/{cluster}/topic/name") + @Operation(tags = {"topic"}, summary = "List all topics name") + public List listTopicNames( + HttpRequest request, + String cluster, + Optional show + ) throws ExecutionException, InterruptedException { + return this.topicRepository.all(cluster, show.orElse(TopicRepository.TopicListView.valueOf(defaultView)), Optional.empty()); + } + + @Secured(Role.ROLE_TOPIC_INSERT) @Post(value = "api/{cluster}/topic") @Operation(tags = {"topic"}, summary = "Create a topic") @@ -353,6 +372,68 @@ public ResultNextList record( ); } + @Secured(Role.ROLE_TOPIC_DATA_READ) + @Get("api/{cluster}/topic/{topicName}/offsets/start") + @Operation(tags = {"topic data"}, summary = "Get topic partition offsets by timestamp") + public List offsetsStart(String cluster, String topicName, Optional timestamp) throws ExecutionException, InterruptedException { + Topic topic = this.topicRepository.findByName(cluster, topicName); + + return recordRepository.getOffsetForTime( + cluster, + topic.getPartitions() + .stream() + .map(r -> new TopicPartition(r.getTopic(), r.getId())) + .collect(Collectors.toList()), + timestamp.orElse(Instant.now()).toEpochMilli() + ); + } + + + @Secured(Role.ROLE_TOPIC_DATA_INSERT) + @Post("api/{fromCluster}/topic/{fromTopicName}/copy/{toCluster}/topic/{toTopicName}") + @Operation(tags = {"topic data"}, summary = "Copy from a topic to another topic") + public RecordRepository.CopyResult copy( + HttpRequest request, + String fromCluster, + String fromTopicName, + String toCluster, + String toTopicName, + @Body List offsets + ) throws ExecutionException, InterruptedException { + Topic fromTopic = this.topicRepository.findByName(fromCluster, fromTopicName); + Topic toTopic = this.topicRepository.findByName(toCluster, toTopicName); + + if (!CollectionUtils.isNotEmpty(offsets)) { + throw new IllegalArgumentException("Empty collections"); + } + + // after wait for next offset, so add - 1 to allow to have the current offset + String offsetsList = offsets.stream() + .filter(offsetCopy -> offsetCopy.offset - 1 >= 0) + .map(offsetCopy -> + String.join("-", String.valueOf(offsetCopy.partition), String.valueOf(offsetCopy.offset - 1))) + .collect(Collectors.joining("_")); + + RecordRepository.Options options = dataSearchOptions( + fromCluster, + fromTopicName, + Optional.ofNullable(StringUtils.isNotEmpty(offsetsList) ? offsetsList : null), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty() + ); + + return this.recordRepository.copy(fromTopic, toCluster, toTopic, offsets, options); + } + + @ToString + @EqualsAndHashCode + @Getter + public static class CopyResponse { + int records; + } + private RecordRepository.Options dataSearchOptions( String cluster, String topicName, @@ -392,4 +473,12 @@ public SearchRecord(double percent, String after) { @JsonProperty("after") private final String after; } + + @NoArgsConstructor + @AllArgsConstructor + @Getter + public static class OffsetCopy { + private int partition; + private long offset; + } } diff --git a/src/main/java/org/akhq/repositories/RecordRepository.java b/src/main/java/org/akhq/repositories/RecordRepository.java index 340e10528..4759fcd15 100644 --- a/src/main/java/org/akhq/repositories/RecordRepository.java +++ b/src/main/java/org/akhq/repositories/RecordRepository.java @@ -9,6 +9,7 @@ import io.reactivex.Flowable; import lombok.*; import lombok.extern.slf4j.Slf4j; +import org.akhq.controllers.TopicController; import org.apache.kafka.clients.admin.DeletedRecords; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -16,6 +17,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaFuture; @@ -29,6 +31,7 @@ import org.akhq.modules.KafkaModule; import org.akhq.utils.Debug; + import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -58,6 +61,9 @@ public class RecordRepository extends AbstractRepository { @Value("${akhq.topic-data.poll-timeout:1000}") protected int pollTimeout; + @Value("${akhq.clients-defaults.consumer.properties.max.poll.records:50}") + protected int maxPollRecords; + public List consume(String clusterId, Options options) throws ExecutionException, InterruptedException { return Debug.call(() -> { Topic topicsDetail = topicRepository.findByName(clusterId, options.topic); @@ -719,6 +725,75 @@ public Flowable> tail(String clusterId, TailOptions options) { }); } + public CopyResult copy(Topic fromTopic, String toClusterId, Topic toTopic, List offsets, RecordRepository.Options options) { + KafkaConsumer consumer = this.kafkaModule.getConsumer( + options.clusterId, + new Properties() {{ + put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); + }} + ); + + Map partitions = getTopicPartitionForSortOldest(fromTopic, options, consumer); + + Map filteredPartitions = partitions.entrySet().stream() + .filter(topicPartitionLongEntry -> offsets.stream() + .anyMatch(offsetCopy -> offsetCopy.getPartition() == topicPartitionLongEntry.getKey().partition())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + int counter = 0; + + if (filteredPartitions.size() > 0) { + consumer.assign(filteredPartitions.keySet()); + filteredPartitions.forEach(consumer::seek); + + if (log.isTraceEnabled()) { + filteredPartitions.forEach((topicPartition, first) -> + log.trace( + "Consume [topic: {}] [partition: {}] [start: {}]", + topicPartition.topic(), + topicPartition.partition(), + first + ) + ); + } + + boolean samePartition = toTopic.getPartitions().size() == fromTopic.getPartitions().size(); + + KafkaProducer producer = kafkaModule.getProducer(toClusterId); + ConsumerRecords records; + do { + records = this.poll(consumer); + for (ConsumerRecord record : records) { + System.out.println(record.offset() + "-" + record.partition()); + + counter++; + producer.send(new ProducerRecord<>( + toTopic.getName(), + samePartition ? record.partition() : null, + record.timestamp(), + record.key(), + record.value(), + record.headers() + )); + } + + } while (!records.isEmpty()); + + producer.flush(); + } + consumer.close(); + + return new CopyResult(counter); + } + + @ToString + @EqualsAndHashCode + @AllArgsConstructor + @Getter + public static class CopyResult { + int records; + } + @ToString @EqualsAndHashCode @Getter