Skip to content

Commit

Permalink
update/learn
Browse files Browse the repository at this point in the history
  • Loading branch information
杜文凯 committed Jul 16, 2022
1 parent 3ad70d2 commit 9a2e5d0
Show file tree
Hide file tree
Showing 22 changed files with 753 additions and 209 deletions.
6 changes: 6 additions & 0 deletions 人工智能/变更检测/数据同步机制.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 分布式数据同步机制

https://lotabout.me/2019/Data-Synchronization-in-Distributed-System/


https://highlyscalable.wordpress.com/2013/08/20/in-stream-big-data-processing/
36 changes: 36 additions & 0 deletions 人工智能/算法/蚁群算法/ACO.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import numpy as np
import matplotlib.pylot as plt


class ACO:
def __init__(self, parameters):
"""
Ant Colony 蚁周模型调整
parameter: [NGEN, pop_size, var_num_min, var_num_max]
"""
self.NGEN = parameters[0]
self.pop_size = parameters[1]
self.var_num = len(parameters[2])
self.bound = []
self.bound.append(parameters[2])
self.bound.append(parameters[3])

self.pop_x = np.zeros((self.pop_size, self.var_num))
self.g_best = np.zeros((1, self.var_num))

temp = -1
for i in range(self.pop_size):
for j in range(self.var_num):
self.pop_x[i][j] = np.random.uniform(self.bound[0][j], self.bound[0][j])
fit = self.fitness(self.pop_x[i])
if fit > temp:
self.g_best = self.pop_x[i]
temp = fit

def fitness(slef, index_var):
x1 = index_var[0]
x2 = index_var[1]
x3 = index_var[2]
x4 = index_var[3]
y = x1 ** 2 + x2 **2 + x3 ** 3 + x4 ** 4
return y
17 changes: 17 additions & 0 deletions 人工智能/算法/蚁群算法/蚁群算法.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# 四大启发式算法

# 蚁群算法


蚁周模型(Ant-Cycle)
蚁量模型(Ant-Quantity)
蚁密模型(Ant-Density)

https://bbs.huaweicloud.com/blogs/354067


https://finthon.com/python-aco/

# A*寻路算法

https://blog.csdn.net/u014361280/article/details/104740876
31 changes: 21 additions & 10 deletions 大数据/hadoop/大数据实战.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,23 @@ ocal coaa.sample.txt /hadoop/ch2mkdir: Permission denied: user=root, access=WRIT

1.修改hdfs参数 dfs.permissions=false,赋权限给root

2.重启HDFS组件,让其生效。
2.重启HDFS组件,让其生效。

3.重启成功后,shell重新登录,root执行也ok

不让其进行权限验证,

创建对应用户目录
```sh
sudo -u hdfs hadoop fs -mkdir /user/root
sudo -u hdfs hadoop fs -chown root /user/root
sudo -u hdfs hadoop fs -mkdir /user/spark/applicationHistory
sudo -u hdfs hadoop fs -chown root /user/spark/applicationHistory
hdfs://cdh01.com:8020/user/spark/applicationHistory
```

https://www.cnblogs.com/yy3b2007com/p/9953191.html

https://www.cnblogs.com/yy3b2007com/p/9962099.html#autoid-5-0-0
Expand Down Expand Up @@ -115,7 +126,7 @@ hadoop jar ch2.noaa-1.0-SNAPSHOT.jar avg.AvgTemperature /hadoop/ch2/coaa.sample.
HDFS: Number of large read operations=0
HDFS: Number of write operations=12
HDFS: Number of bytes read erasure-coded=0
Job Counters
Job Counters
Launched map tasks=1
Launched reduce tasks=6
Data-local map tasks=1
Expand Down Expand Up @@ -159,9 +170,9 @@ hadoop jar ch2.noaa-1.0-SNAPSHOT.jar avg.AvgTemperature /hadoop/ch2/coaa.sample.
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
File Input Format Counters
Bytes Read=72000692
File Output Format Counters
File Output Format Counters
Bytes Written=9
```

Expand Down Expand Up @@ -390,12 +401,12 @@ Welcome to
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0-cdh6.3.2
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
scala>
```
https://blog.csdn.net/summer089089/article/details/110640963
Expand All @@ -413,14 +424,14 @@ https://www.shangmayuan.com/a/8c82dbaac3814a6c84b2d87a.html
数据倾斜在MapReduce编程模型中十分常见,用最通俗易懂的话来说,数据倾斜无非就是大量的相同key被partition分配到一个分区里,造成了'一个人累死,其他人闲死'的情况,这种情况是我们不能接受的,这也违背了并行计算的初衷,首先一个节点要承受着巨大的压力,而其他节点计算完毕后要一直等待这个忙碌的节点,也拖累了整体的计算时间,可以说效率是十分低下的。
数据倾斜发生时的现象:
数据倾斜发生时的现象:
1、绝大多数task执行得都非常快,但个别task执行的极慢。
1、绝大多数task执行得都非常快,但个别task执行的极慢。
2、原本能正常执行的Spark作业,某天突然爆出OOM(内存溢出)异常。观察异常栈,是我们写的业务代码造成的
数据倾斜发生的原理 :
在进行shuffle的时候,必须将各个节点上相同的Key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或者join操作。如果某个key对应的数据量特别大的话,会发生数据倾斜。比如大部分key对应的10条数据,但个别key却对应了100万条数据,那么大部分task会只分配到10条数据,而个别task可能会分配了100万数据。整个spark作业的运行进度是由运行时间最长的那个task决定的。
在进行shuffle的时候,必须将各个节点上相同的Key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或者join操作。如果某个key对应的数据量特别大的话,会发生数据倾斜。比如大部分key对应的10条数据,但个别key却对应了100万条数据,那么大部分task会只分配到10条数据,而个别task可能会分配了100万数据。整个spark作业的运行进度是由运行时间最长的那个task决定的。
因此出现数据倾斜的时候,spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致OOM。
解决方案
Expand Down Expand Up @@ -544,4 +555,4 @@ https://www.jianshu.com/p/a054cc99e0c2
# CDH Agent安装失败
https://blog.csdn.net/weixin_36485298/article/details/116469291
https://blog.csdn.net/weixin_36485298/article/details/116469291
33 changes: 26 additions & 7 deletions 大数据/spark/spark原理.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ https://www.cnblogs.com/yy3b2007com/p/11087180.html
https://bbs.huaweicloud.com/blogs/216441


# spark driver
# spark driver

spark驱动程序是声明数据RDD上的转换和操作并将此类请求提交给master的程序。

Expand Down Expand Up @@ -123,14 +123,14 @@ https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/yarn-service/
21/10/26 18:13:16 INFO util.SignalUtils: Registered signal handler for INT
21/10/26 18:13:17 INFO spark.SecurityManager: Changing view acls to: yarn,root
21/10/26 18:13:17 INFO spark.SecurityManager: Changing modify acls to: yarn,root
21/10/26 18:13:17 INFO spark.SecurityManager: Changing view acls groups to:
21/10/26 18:13:17 INFO spark.SecurityManager: Changing modify acls groups to:
21/10/26 18:13:17 INFO spark.SecurityManager: Changing view acls groups to:
21/10/26 18:13:17 INFO spark.SecurityManager: Changing modify acls groups to:
21/10/26 18:13:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, root); groups with view permissions: Set(); users with modify permissions: Set(yarn, root); groups with modify permissions: Set()
21/10/26 18:13:17 INFO client.TransportClientFactory: Successfully created connection to cdh02.com/172.19.211.141:40761 after 61 ms (0 ms spent in bootstraps)
21/10/26 18:13:17 INFO spark.SecurityManager: Changing view acls to: yarn,root
21/10/26 18:13:17 INFO spark.SecurityManager: Changing modify acls to: yarn,root
21/10/26 18:13:17 INFO spark.SecurityManager: Changing view acls groups to:
21/10/26 18:13:17 INFO spark.SecurityManager: Changing modify acls groups to:
21/10/26 18:13:17 INFO spark.SecurityManager: Changing view acls groups to:
21/10/26 18:13:17 INFO spark.SecurityManager: Changing modify acls groups to:
21/10/26 18:13:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, root); groups with view permissions: Set(); users with modify permissions: Set(yarn, root); groups with modify permissions: Set()
21/10/26 18:13:17 INFO client.TransportClientFactory: Successfully created connection to cdh02.com/172.19.211.141:40761 after 1 ms (0 ms spent in bootstraps)
21/10/26 18:13:17 INFO storage.DiskBlockManager: Created local directory at /yarn/nm/usercache/root/appcache/application_1635242602037_0001/blockmgr-decb13d0-488e-45ae-8f95-88af06ff54d5
Expand Down Expand Up @@ -217,7 +217,7 @@ https://blog.csdn.net/Young2018/article/details/108856622

为了有效地支持特定于域的对象,Encoder需要一个。编码器将域特定类型映射T到 Spark 的内部类型系统。例如,给定一个Person 具有name(string) 和age(int)两个字段的类,编码器用于告诉 Spark 在运行时生成代码以将Person对象序列化为二进制结构。这种二进制结构通常具有低得多的内存占用,并且针对数据处理的效率进行了优化(例如,以列格式)。要了解数据的内部二进制表示,请使用该 schema函数。

static Dataset<Row> ofRows(SparkSession sparkSession, org.apache.spark.sql.catalyst.plans.logical.LogicalPlan logicalPlan)
static Dataset<Row> ofRows(SparkSession sparkSession, org.apache.spark.sql.catalyst.plans.logical.LogicalPlan logicalPlan)
static Dataset<Row> ofRows(SparkSession sparkSession, org.apache.spark.sql.catalyst.plans.logical.LogicalPlan logicalPlan, org.apache.spark.sql.catalyst.QueryPlanningTracker tracker)

ofRows 的一种变体,它允许传入跟踪器,以便我们可以跟踪查询解析时间。
Expand Down Expand Up @@ -267,4 +267,23 @@ https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html

https://zhuanlan.zhihu.com/p/28893155

https://zhuanlan.zhihu.com/p/55127453
https://zhuanlan.zhihu.com/p/55127453


# 同源 DataFrame JOIN 陷阱
当同源 DataFrame(衍生于同一个 DataFrame )之间进行 Join 时,可能会导致一些意想不到的错误。

有多种方式可以解决这个问题:

- 使用 SQL 表达式

- 为 DataFrame 起别名
- withColumn 重命名列
- toDF 重新定义其中一个 DataFrame 的 Schema:

https://liketea.xyz/Spark/Spark/Spark%20%E6%8C%87%E5%8D%97%EF%BC%9ASpark%20SQL%EF%BC%88%E4%BA%8C%EF%BC%89%E2%80%94%E2%80%94%20%E7%BB%93%E6%9E%84%E5%8C%96%E6%93%8D%E4%BD%9C/


# mysql data_type base64列

https://marsishandsome.github.io/2019/10/Base64
7 changes: 7 additions & 0 deletions 大数据/spark/spark调度.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# spark on yarn 调度



https://blog.coderap.com/article/309

https://www.cnblogs.com/xia520pi/p/8695141.html
57 changes: 57 additions & 0 deletions 大数据/spark/分享/byzer执行核心业务逻辑.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
1. API接口层获取sql语句

```scala
@At(path = Array("/run/script"), types = Array(GET, POST)){
val sparkSession = getSession

accessAuth(sparkSession)
val jobInfo = JobManager.getJobInfo(
param("owner"), param("jobType", MLSQLJobType.SCRIPT), param("jobName"), param("sql"),
paramAsLong("timeout", -1L)
)
val context = createScriptSQLExecListener(sparkSession, jobInfo.groupId)

def query = {
if (paramAsBoolean("async", false)) {
JobManager.asyncRun(sparkSession, jobInfo, () => {
val urlString = param("callback")
val maxTries = Math.max(0, paramAsInt("maxRetries", -1)) + 1
try {
ScriptSQLExec.parse(param("sql"), context,
skipInclude = paramAsBoolean("skipInclude", false),
skipAuth = paramAsBoolean("skipAuth", true),
skipPhysicalJob = paramAsBoolean("skipPhysicalJob", false),
skipGrammarValidate = paramAsBoolean("skipGrammarValidate", true))

outputResult = getScriptResult(context, sparkSession)

executeWithRetrying[HttpResponse](maxTries)(
RestUtils.httpClientPost(urlString,
Map("stat" -> s"""succeeded""",
"res" -> outputResult,
"jobInfo" -> JSONTool.toJsonStr(jobInfo))),
HttpStatus.SC_OK == _.getStatusLine.getStatusCode,
response => logger.error(s"Succeeded SQL callback request failed after ${maxTries} attempts, " +
s"the last response status is: ${response.getStatusLine.getStatusCode}.")
)
}
```
2. 创建脚本SQL执行监听器
```scala
private def createScriptSQLExecListener(sparkSession: SparkSession, groupId: String) = {

val allPathPrefix = fromJson(param("allPathPrefix", "{}"), classOf[Map[String, String]])
val defaultPathPrefix = param("defaultPathPrefix", "")
val context = new ScriptSQLExecListener(sparkSession, defaultPathPrefix, allPathPrefix)
val ownerOption = if (params.containsKey("owner")) Some(param("owner")) else None
val userDefineParams = params.toMap.filter(f => f._1.startsWith("context.")).map(f => (f._1.substring("context.".length), f._2))
ScriptSQLExec.setContext(new MLSQLExecuteContext(context, param("owner"), context.pathPrefix(None), groupId,
userDefineParams ++ Map("__PARAMS__" -> JSONTool.toJsonStr(params().toMap))
))
context.initFromSessionEnv
context.addEnv("SKIP_AUTH", param("skipAuth", "true"))
context.addEnv("HOME", context.pathPrefix(None))
context.addEnv("OWNER", ownerOption.getOrElse("anonymous"))
context
}
```
Loading

0 comments on commit 9a2e5d0

Please sign in to comment.